摘要:現(xiàn)在用方式調用接口,中使用方式輸入內容日志平臺網(wǎng)關層基于。日志平臺網(wǎng)關層基于到此為止,提取經(jīng)過網(wǎng)關的接口信息,并將其寫入日志文件就完成了,所有的接口日志都寫入了文件中。
背景介紹 1、問題現(xiàn)狀與嘗試
沒有做日志記錄的線上系統(tǒng),絕對是給系統(tǒng)運維人員留下的坑。尤其是前后端分離的項目,后端的接口日志可以解決對接、測試和運維時的很多問題。之前項目上發(fā)布的接口都是通過Oracle Service Bus(OSB)來做統(tǒng)一編排,在編排時加上日志記錄,并將接口日志存儲到數(shù)據(jù)庫中。最后基于接口日志數(shù)據(jù)開發(fā)日志平臺,來統(tǒng)一的接口日志分析。
但我們總不能為了記錄日志而使用OSB,這樣很不自由。今年我們有很多后臺接口使用Spring來開發(fā),后臺程序的部署環(huán)境也不局限于Oracle中間件的環(huán)境。當某些場景時,脫離了OSB,我們該如何記錄接口日志,這是本文要解決的問題。
在我寫的Spring系列的文章中,有嘗試過使用Spring的AOP來記錄日志。在每個項目的代碼中,定義一個記錄日志的切面,該切面會對該項目下的所有接口做日志記錄。
對于一個周期很長、規(guī)模很大的一個獨立項目來說,這個方案是可行的。因為項目周期很長,花個兩天做日志記錄的AOP開發(fā)沒啥問題,而且這個日志更契合該系統(tǒng)的業(yè)務特征。
但我們團隊所面對的開發(fā),基本上都是數(shù)量多、周期短的一些小項目。一個項目的開發(fā)周期可能只有十天,就算每個項目在日志記錄上只用一天的工作量,所占的比重也有十分之一。如果我們每個項目都要獨立的記錄日志,累積的工作量也挺大的,而且重復這樣的工作很枯燥。
就像面向切面編程(AOP),在一個項目的所有接口上設置“切面”統(tǒng)一編程。如果我們的能在所有的項目上設置“切面”統(tǒng)一編程,就能解決我們現(xiàn)在的問題。這個“切面”就是網(wǎng)關。
這個方案是公司內的兩位技術大佬討論出來的,這樣驚奇的想法,讓之前困擾的一切迷霧都豁然開朗了起來。我花了兩天做了個Demo,驗證方案的確行得通,下文會附上本次Demo中實戰(zhàn)操作的代碼。
簡單來說,所有項目接口都通過Nginx的網(wǎng)關,而我們不需要在代碼層面上收集日志,而是在Nginx上獲取想要的日志信息,配合ELKF(Elasticsearch、Logstash、Kibana、Filebeat)的解決方案,實現(xiàn)統(tǒng)一的日志平臺搭建:
Nginx+Lua編程,按照我們定義的格式,所有通過網(wǎng)關的接口都會留下日志信息,寫入log文件。
Filebeat收集數(shù)據(jù),F(xiàn)ilebeat實時監(jiān)測目標log文件,收集數(shù)據(jù)推送給Logstash。
Logstash過濾處理數(shù)據(jù),Logstash過濾處理數(shù)據(jù)后,會將數(shù)據(jù)同時推送給Elasticsearch和Kafka。
Elasticsearch+Kibana,Elasticsearch作為數(shù)據(jù)的搜索引擎,而且利用Kibana的可視化界面,將日志數(shù)據(jù)以報表的形式顯示出來。
Kafka消息隊列中間件,日志的數(shù)據(jù)被推送到Kafka上之后發(fā)布消息,而所有訂閱者就能從隊列中讀數(shù)據(jù)。本次就是寫程序實時的讀取隊列中的數(shù)據(jù),存入數(shù)據(jù)庫。
3、系統(tǒng)環(huán)境在本次Demo中,由于資源限制,所有的產品服務都將部署在一臺服務器上,服務器上的相關環(huán)境如下:
配置項 | 環(huán)境配置信息 |
---|---|
服務器 | 阿里云服務器ECS(公網(wǎng):47.96.238.21 ,私網(wǎng):172.16.187.25) |
服務器配置 | 2 vCPU + 4 GB內存 |
JDK版本 | JDK 1.8.0_181 |
操作系統(tǒng) | CentOS 7.4 64位 |
OpenResty | 1.13.6.2 |
Filebeat | 6.2.4 |
Elasticsearch | 6.2.4 |
Logstash | 6.2.4 |
Kibana | 6.2.4 |
Kafka | 2.10-0.10.2.1 |
OpenResty? 是一個基于 Nginx 與 Lua 的高性能 Web 平臺,其內部集成了大量精良的 Lua 庫、第三方模塊以及大多數(shù)的依賴項。用于方便地搭建能夠處理超高并發(fā)、擴展性極高的動態(tài) Web 應用、Web 服務和動態(tài)網(wǎng)關。
我們選擇OpenResty的目的有兩個:(1)使用Lua編程,可以在Nginx上更好的拿到想要的日志信息;(2)系統(tǒng)其它功能模塊的集成,例如Jwt的集成,可參考同事寫的文章《Nginx實現(xiàn)JWT驗證-基于OpenResty實現(xiàn)》。
在安裝OpenResty之前需要先安裝好依賴庫,OpenResty 依賴庫有: perl 5.6.1+, libreadline, libpcre, libssl。我們是CentOS系統(tǒng),可以直接yum來安裝。
[root@Kerry ~]# yum install readline-devel pcre-devel openssl-devel perl
接下來我們在當前CentOS系統(tǒng)上使用新的官方 yum 源
[root@Kerry ~]# yum install yum-utils [root@Kerry ~]# yum-config-manager --add-repo https://openresty.org/package/centos/openresty.repo
這時我們就可以直接安裝OpenResty
[root@Kerry ~]# yum install openresty [root@Kerry ~]# yum install openresty-resty
這樣OpenResty就安裝完成了,默認情況下程序會被安裝到 /usr/local/openresty 目錄
# 可查看安裝成功 [root@Kerry ~]# cd /usr/local/openresty/bin/ [root@Kerry bin]# ./openresty -v nginx version: openresty/1.13.6.2 # 設置環(huán)境變量 [root@Kerry sbin]# vi /etc/profile # 在文件最后面加上 export PATH=${PATH}:/usr/local/openresty/nginx/sbin [root@Kerry sbin]# source /etc/profile2、記錄Nginx日志
OpenResty 安裝之后就有配置文件及相關的目錄的,為了工作目錄與安裝目錄互不干擾,我們多帶帶建一個工作目錄。我在根目錄下新建了 /openrestyTest/v1/ 的文件夾,并在該目錄下創(chuàng)建 logs 和 conf 子目錄分別用于存放日志和配置文件。
[root@Kerry ~]# mkdir /openrestyTest /openrestyTest/v1 /openrestyTest/v1/conf /openrestyTest/v1/logs [root@Kerry ~]# cd /openrestyTest/v1/conf/ # 創(chuàng)建并編輯 nginx.conf [root@Kerry conf]# vi nginx.conf
在nginx.conf中復制以下文本作為測試
worker_processes 1; #nginx worker 數(shù)量 error_log logs/error.log; #指定錯誤日志文件路徑 events { worker_connections 1024; } http { server { #監(jiān)聽端口,若你的6699端口已經(jīng)被占用,則需要修改 listen 6699; location / { default_type text/html; content_by_lua_block { ngx.say("HelloWorld") } } } }
該語法是基于Lua,監(jiān)聽6699端口,輸出HelloWorld。我們現(xiàn)在啟動Openresty中的Nginx。
[root@Kerry ~]# /usr/local/openresty/nginx/sbin/nginx -p "/openrestyTest/v1/" -c conf/nginx.conf # 由于配置或環(huán)境變量,也可以直接使用 [root@Kerry ~]# nginx -p "/openrestyTest/v1/" -c conf/nginx.conf [root@Kerry conf]# curl http://localhost:6699 HelloWorld
訪問該端口地址,成功的顯示HelloWorld。我提前在本服務器的Tomcat上部署了一個接口,端口是8080。我的想法是將8080反向代理成9000,將所有通過8080端口的服務的日志信息獲取到,并輸出到本地的log文件中。
我暫時需要記錄的日志內容包括:接口地址,請求內容,請求時間,響應內容,響應時間等。代碼寫好了,直接替換 /openrestyTest/v1/conf/nginx.conf 的文件內容。
worker_processes 1; error_log logs/error.log; events { worker_connections 1024; } http { log_format myformat "{"status":"$status","requestTime":"$requestTime","responseTime":"$responseTime","requestURL":"$requestURL","method":"$method","requestContent":"$request_body","responseContent":"$responseContent"}"; access_log logs/test.log myformat; upstream tomcatTest { server 47.96.238.21:8080; } server { server_name 47.96.238.21; listen 9000; # 默認讀取 body lua_need_request_body on; location / { log_escape_non_ascii off; proxy_pass http://tomcatTest; set $requestURL ""; set $method ""; set $requestTime ""; set $responseTime ""; set $responseContent ""; body_filter_by_lua " ngx.var.requestTime=os.date("%Y-%m-%d %H:%M:%S") ngx.var.requestURL=ngx.var.scheme.."://"..ngx.var.server_name..":"..ngx.var.server_port..ngx.var.request_uri ngx.var.method=ngx.var.request_uri local resp_body = string.sub(ngx.arg[1], 1, 1000) ngx.ctx.buffered = (ngx.ctx.buffered or"") .. resp_body if ngx.arg[2] then ngx.var.responseContent = ngx.ctx.buffered end ngx.var.responseTime=os.date("%Y-%m-%d %H:%M:%S") "; } } }
重新啟動Nginx,然后進行驗證
[root@Kerry conf]# nginx -p "/openrestyTest/v1/" -c conf/nginx.conf -s reload
我準備好的接口地址為:http://47.96.238.21:8080/springboot-demo/hello ,該接口返回的結果都是“Hello!Spring boot”。
現(xiàn)在用POST方式調用接口http://47.96.238.21:9000/springboot-demo/hello,Request中使用application/json方式輸入內容:“segmentFault《日志平臺(網(wǎng)關層) - 基于Openresty+ELKF+Kafka》”。然后查看logs文件夾,發(fā)現(xiàn)多了個 test.log 文件,我們查看該文件。就可以發(fā)現(xiàn),當我們每調用一次接口,就會同步的輸出接口日志到該文件中。
[root@Kerry conf]# tail -500f /openrestyTest/v1/logs/test.log {"status":"200","requestTime":"2018-10-11 18:09:02","responseTime":"2018-10-11 18:09:02","requestURL":"http://47.96.238.21:9000/springboot-demo/hello","method":"/springboot-demo/hello","requestContent":"segmentFault《日志平臺(網(wǎng)關層) - 基于Openresty+ELKF+Kafka》","responseContent":"Hello!Spring boot!"}
到此為止,提取經(jīng)過Nginx網(wǎng)關的接口信息,并將其寫入日志文件就完成了,所有的接口日志都寫入了 test.log 文件中。
E+L+K+F=日志數(shù)據(jù)處理ELKF是 Elastic + Logstash + Kibana + FileBeat 四個組件的組合,可能ELK對于大家來說更熟悉,ELKF只不過多了Filebeat,它們都是Elastic公司推出的開源產品。剛好這幾天Elastic公司成功上市,掀起了一波ELKF產品討論的熱潮。
原ELK架構中,Logstash負責收集日志信息并上報,但后來Elastic公司又推出了Filebeat,大家發(fā)現(xiàn)Filebeat在日志文件收集上效果更好,就只讓Logstash負責日志的處理和上報了。在這個系統(tǒng)中,Elastic充當一個搜索引擎,Logstash為日志分析上報系統(tǒng),F(xiàn)ileBeat為日志文件收集系統(tǒng),Kibana為此系統(tǒng)提供可視化的Web界面。
Filebeat:輕量型日志采集器,負責采集文件形式的日志,并將采集來的日志推送給logstash進行處理。
[root@Kerry ~]# cd /u01/install/ [root@Kerry install]# wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.2.4-x86_64.rpm [root@Kerry install]# yum localinstall -y filebeat-6.2.4-x86_64.rpm
安裝完成后,我們開始配置Filebeat來采集日志,并推送給Logstash。
[root@Kerry install]# cd /etc/filebeat/ [root@Kerry filebeat]# vi filebeat.yml
該filebeat.yml是filebeat的配置文件,里面大部分的模塊都被注釋了,本次配置放開的代碼有;
filebeat.prospectors: - type: log enabled: true paths: - /openrestyTest/v1/logs/*.log filebeat.config.modules: path: ${path.config}/modules.d/*.yml reload.enabled: false setup.template.settings: index.number_of_shards: 3 output.logstash: hosts: ["47.96.238.21:5044"]
監(jiān)聽 /openrestyTest/v1/logs/ 目錄下的log文件,采集的日志信息輸出到logstash,該hosts等我們安裝啟動了Logstash再說,先啟動Filebeat。
[root@Kerry filebeat]# cd /usr/share/filebeat/bin/ [root@Kerry bin]# touch admin.out [root@Kerry bin]# nohup ./filebeat -e -c /etc/filebeat/filebeat.yml > admin.out & # 查看admin.out 日志,是否啟動成功2、Logstash安裝配置
Logstash:日志處理工具,負責日志收集、轉換、解析等,并將解析后的日志推送給ElasticSearch進行檢索。
[root@Kerry ~]# cd /u01/install/ [root@Kerry install]# wget https://artifacts.elastic.co/downloads/logstash/logstash-6.2.4.rpm [root@Kerry install]# yum localinstall -y logstash-6.2.4.rpm #Logstash不建議用root啟動 [root@Kerry install]# group add logstash [root@Kerry install]# useradd -g logstash logstash [root@Kerry install]# passwd logstash # 設置密碼 [root@Kerry install]# su logstash [root@Kerry install]# mkdir -pv /data/logstash/{data,logs} [root@Kerry install]# chown -R logstash.logstash /data/logstash/ [root@Kerry install]# vi /etc/logstash/conf.d/logstash.conf
創(chuàng)建并編輯/etc/logstash/conf.d/logstash.conf 文件,配置如下:
input { beats { port => 5044 codec => plain { charset => "UTF-8" } } } output { elasticsearch { hosts => "47.96.238.21:9200" manage_template => false index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}" document_type => "%{[@metadata][type]}" } }
1、input:是指Logstash的數(shù)據(jù)來源,啟動后使用5044來監(jiān)聽,是否很熟悉,就是上節(jié)Filebeat推送日志的hosts。
2、output;是Logstash輸出數(shù)據(jù)的位置,我們這里定義為elasticsearch,下文中會說到,用于ELK架構中的日志分析
接下來我們修改/etc/logstash/logstash.yml
#vim /etc/logstash/logstash.yml path.data: /data/logstash/data path.logs: /data/logstash/logs
現(xiàn)在可以啟動Logstash了
[root@Kerry install]# su logstash [logstash@Kerry root]$ cd /usr/share/logstash/bin/ [logstash@Kerry bin]$ touch admin.out [logstash@Kerry bin]$ nohup ./logstash -f /etc/logstash/conf.d/logstash.conf >admin.out &3、Elasticsearch安裝配置
ElasticSearch:是一個分布式的RESTful風格的搜索和數(shù)據(jù)分析引擎,同時還提供了集中存儲功能,它主要負責將logstash抓取來的日志數(shù)據(jù)進行檢索、查詢、分析等。
[root@Kerry ~]# cd /u01/install/ [root@Kerry install]# wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.2.4.rpm [root@Kerry install]# yum localinstall -y elasticsearch-6.2.4.rpm #Elasticsearch不建議用root啟動 [root@Kerry install]# group add elsearch [root@Kerry install]# useradd -g elsearch elsearch [root@Kerry install]# passwd elsearch # 設置密碼 [root@Kerry install]# su elsearch [elsearch@Kerry bin]$ mkdir -pv /data/elasticsearch/{data,logs} [elsearch@Kerry bin]$ chown -R elsearch.elsearch /data/elasticsearch/ [elsearch@Kerry bin]$ vi /etc/elasticsearch/elasticsearch.yml path.data: /data/elasticsearch/data path.logs: /data/elasticsearch/logs network.host: 0.0.0.0 http.port: 9200
如果想要外網(wǎng)能訪問,host就必須要設成0.0.0.0。Elasticsearch的啟動如下
[root@Kerry install]# su elsearch [elsearch@Kerry bin]$ cd /usr/share/elasticsearch/bin/ [elsearch@Kerry bin]$ ./elasticsearch -d # -d 保證后臺啟動4、Kibana安裝配置
Kibana:Web前端,可以將ElasticSearch檢索后的日志轉化為各種圖表,為用戶提供數(shù)據(jù)可視化支持。
[root@Kerry ~]# cd /u01/install/ [root@Kerry install]# wget https://artifacts.elastic.co/downloads/kibana/kibana-6.2.4-x86_64.rpm [root@Kerry install]# yum localinstall -y kibana-6.2.4-x86_64.rpm [root@Kerry install]# vi /etc/kibana/kibana.yml server.port: 5601 server.host: "0.0.0.0" elasticsearch.url: "http://47.96.238.21:9200"
同樣的,host為0.0.0.0,保證外網(wǎng)能訪問。Kibana只作為前端展示,日志數(shù)據(jù)的獲取還是借助于elasticsearch,所以這里配置了elasticsearch.url。接著啟動Kibana,就能通過頁面看到日志的報表。
[root@Kerry ~]# cd /usr/share/kibana/bin/ [root@Kerry bin]# touch admin.out [root@Kerry bin]# nohup ./kibana >admin.out &
我們在瀏覽器上訪問 http://47.96.238.21:5601/ ,正常來說就能訪問Kibana的頁面。如果 ELKF一整套配置沒問題,就能在Kibana的頁面上實時的看到所有日志信息。
從Kafka到數(shù)據(jù)庫在拿到日志的數(shù)據(jù)后,通過Elasticsearch和Kibana,已經(jīng)完成了一個日志查看的平臺。但我們自己項目內部也已經(jīng)開發(fā)了日志平臺,希望把這些日志接入到之前的日志平臺中;或者我們希望定制化一個更符合實際使用的日志平臺,這些都需要把拿到的日志數(shù)據(jù)存儲到數(shù)據(jù)庫里。
但所有日志的記錄,很明顯處于高并發(fā)環(huán)境,很容易由于來不及同步處理,導致請求發(fā)生堵塞。比如說,大量的insert,update之類的請求同時到達數(shù)據(jù)庫,直接導致無數(shù)的行鎖表鎖,甚至最后請求會堆積過多,從而觸發(fā)too many connections錯誤。通過使用消息隊列,我們可以異步處理請求,從而緩解系統(tǒng)的壓力。在比對市場上開源的消息中間件后,我選擇了Kafka。
Apache Kafka是一個分布式的發(fā)布-訂閱消息系統(tǒng),能夠支撐海量數(shù)據(jù)的數(shù)據(jù)傳遞。在離線和實時的消息處理業(yè)務系統(tǒng)中,Kafka都有廣泛的應用。Kafka將消息持久化到磁盤中,并對消息創(chuàng)建了備份保證了數(shù)據(jù)的安全。Kafka主要特點是基于Pull的模式來處理消息消費,追求高吞吐量,一開始的目的就是用于日志收集和傳輸。0.8版本開始支持復制,不支持事務,對消息的重復、丟失、錯誤沒有嚴格要求,適合產生大量數(shù)據(jù)的互聯(lián)網(wǎng)服務的數(shù)據(jù)收集業(yè)務。
Broker:Kafka的broker是無狀態(tài)的,broker使用Zookeeper維護集群的狀態(tài)。Leader的選舉也由Zookeeper負責。
Zookeeper:Zookeeper負責維護和協(xié)調broker。當Kafka系統(tǒng)中新增了broker或者某個broker發(fā)生故障失效時,由ZooKeeper通知生產者和消費者。生產者和消費者依據(jù)Zookeeper的broker狀態(tài)信息與broker協(xié)調數(shù)據(jù)的發(fā)布和訂閱任務。
Producer:生產者將數(shù)據(jù)推送到broker上,當集群中出現(xiàn)新的broker時,所有的生產者將會搜尋到這個新的broker,并自動將數(shù)據(jù)發(fā)送到這個broker上。
Consumer:因為Kafka的broker是無狀態(tài)的,所以consumer必須使用partition
offset來記錄消費了多少數(shù)據(jù)。如果一個consumer指定了一個topic的offset,意味著該consumer已經(jīng)消費了該offset之前的所有數(shù)據(jù)。consumer可以通過指定offset,從topic的指定位置開始消費數(shù)據(jù)。consumer的offset存儲在Zookeeper中。
我們開始Kafka的安裝和啟動
# 安裝 [root@Kerry ~]# cd /u01/install/ [root@Kerry install]# wget http://apache.fayea.com/kafka/0.10.2.1/kafka_2.10-0.10.2.1.tgz [root@Kerry install]# tar -zvxf kafka_2.10-0.10.2.1.tgz -C /usr/local/ [root@Kerry install]# cd /usr/local/ [root@Kerry local]# mv kafka_2.10-0.10.2.1 kafka # 啟動 [root@Kerry local]# cd /usr/local/kafka/bin/ [root@Kerry bin]# ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties [root@Kerry bin]# touch admin.out [root@Kerry bin]# nohup ./kafka-server-start.sh ../config/server.properties >admin.out &
創(chuàng)建一個topic,命名為 kerry
[root@Kerry bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kerry # topic創(chuàng)建成功,下面查看一下 [root@Kerry bin]# ./kafka-topics.sh --list --zookeeper localhost:2181 kerry
我們往這個topic中發(fā)送信息
[root@Kerry bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic kerry Hello Kerry!this is the message for test
我們再開一個窗口,從topic中接受消息
[root@Kerry bin]# ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic kerry --from-beginning Hello Kerry!this is the message for test # 能成功接收到2、生產者:Logstash
Kafka已經(jīng)安裝好了,也建好了topic,而我希望往topic中發(fā)送消息的對象(生產者)是Logstash。即Logstash從Filebeat中獲取數(shù)據(jù)后,除了輸出給Elasticsearch以外,還輸出給Logstash,Logstash作為Kafka的生產者。
這里需要修改一下Logstash的配置文件,在output中再加上kafka的信息
vi /etc/logstash/conf.d/logstash.conf input { beats { port => 5044 codec => plain { charset => "UTF-8" } } } output { elasticsearch { hosts => "47.96.238.21:9200" manage_template => false index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}" document_type => "%{[@metadata][type]}" } kafka { bootstrap_servers => "localhost:9092" #生產者 topic_id => "kerry" #設置寫入kafka的topic compression_type => "snappy" codec => plain { format => "%{message}" } } }
重啟Logstash
[root@Kerry bin]# cd /usr/share/logstash/bin [root@Kerry bin]# ps -ef|grep logstash # kill 進程 [root@Kerry bin]# nohup ./logstash -f /etc/logstash/conf.d/logstash.conf >admin.out &
我們再用POST方式調用之前的測試接口http://47.96.238.21:9000/springboot-demo/hello,請求request為:“這是對kafka的測試”。然后再查看從topic中接受消息
[root@Kerry bin]#./kafka-console-consumer.sh --zookeeper localhost:2181 --topic kerry --from-beginning {"status":"200","requestTime":"2018-10-12 09:40:02","responseTime":"2018-10-12 09:40:02","requestURL":"http://47.96.238.21:9000/springboot-demo/hello","method":"/springboot-demo/hello","requestContent":"這是對kafka的測試","responseContent":"Hello!Spring boot!"}
可以成功的接收到推送過來的日志消息
3、消費者:Springboot編程日志已經(jīng)可以保證能夠持續(xù)不斷的推送到Kafka中,那么就需要有消費者訂閱這些消息,寫入到數(shù)據(jù)庫。我用Spring boot寫了個程序,用來訂閱Kafka的日志,重要代碼如下:
1、application.yml
spring: # kafka kafka: # kafka服務器地址(可以多個) bootstrap-servers: 47.96.238.21:9092 consumer: # 指定一個默認的組名 group-id: kafka1 # earliest:當各分區(qū)下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 # latest:當各分區(qū)下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區(qū)下的數(shù)據(jù) # none:topic各分區(qū)都存在已提交的offset時,從offset后開始消費;只要有一個分區(qū)不存在已提交的offset,則拋出異常 auto-offset-reset: earliest # key/value的反序列化 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: # key/value的序列化 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer # 批量抓取 batch-size: 65536 # 緩存容量 buffer-memory: 524288 # 服務器地址 bootstrap-servers: 47.96.238.21:9092
2、POM.xml
org.springframework.kafka spring-kafka 1.0.6.RELEASE
3、KafkaController.java
package df.log.kafka.nginxlog.controller; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.naming.InitialContext; import javax.sql.DataSource; import java.sql.Connection; @RestController @EnableAutoConfiguration public class KafkaController { @RequestMapping("/hello") public String hello(){ return "Hello!Kerry. This is NginxLog program"; } /** * 監(jiān)聽信息 */ @KafkaListener(topics = "kerry" ) public void receive(ConsumerRecord, ?> consumer) { // kafkaLog 就是獲取到的日志信息 String kafkaLog = (String) consumer.value(); System.out.println("收到一條消息:"+kafkaLog); // 存入數(shù)據(jù)庫的代碼省略 } }
當程序部署之后,@KafkaListener(topics = "kerry") 會持續(xù)監(jiān)聽topics 為kerry的消息。我們再調用之前的測試接口,會發(fā)現(xiàn)新的接口日志會被持續(xù)監(jiān)聽到,在控制臺上打印出來,并存入數(shù)據(jù)庫。
尾聲本次操作文檔是記錄Demo的過程,很多地方并不成熟,例如:如何在 Nginx+Lua 時獲取更加全面的日志信息;在Logstash上對日志進行再加工;寫出漂亮的Spring boot 代碼,使得能夠很平緩的做寫入數(shù)據(jù)庫,用好Kibana的圖表等等。
我們下一步就是在項目的生產環(huán)境上正式的搭建日志平臺,我們已經(jīng)有了rancher環(huán)境,這套架構計劃用微服務的方式實現(xiàn)。后續(xù)的搭建文檔會持續(xù)更新。
文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://www.ezyhdfw.cn/yun/40139.html
摘要:整理自楊波老師的總結注冊中心支持模型存儲和靈活健康檢查能力。服務網(wǎng)關選擇是最佳搭配,但異步性能不足基于的異步未推出正式版。配置中心缺失治理能力。監(jiān)控存儲依賴于時間序列數(shù)據(jù)庫。隊列對于日志等可靠性要求不高的場景,用。功能強大但復雜。 整理自楊波老師的總結 showImg(https://segmentfault.com/img/bV3iL1?w=800&h=512); 注冊中心 Eur...
摘要:于是便誕生了隨行付分布式文件系統(tǒng)簡稱,提供的海量安全低成本高可靠的云存儲服務。子系統(tǒng)相關流程圖如下核心實現(xiàn)主要為隨行付各個業(yè)務系統(tǒng)提供文件共享和訪問服務,并且可以按應用統(tǒng)計流量命中率空間等指標。 背景 傳統(tǒng)Web應用中所有的功能部署在一起,圖片、文件也在一臺服務器;應用微服務架構后,服務之間的圖片共享通過FTP+Nginx靜態(tài)資源的方式進行訪問,文件共享通過nfs磁盤掛載的方式進行訪問...
摘要:個推針對服務場景,基于和搭建了微服務框架,提高了開發(fā)效率。三容器化在微服務落地實踐時我們選擇了,下面將詳細介紹個推基于的實踐。 2016年伊始Docker無比興盛,如今Kubernetes萬人矚目。在這個無比需要創(chuàng)新與速度的時代,由容器、微服務、DevOps構成的云原生席卷整個IT界。個推針對Web服務場景,基于OpenResty和Node.js搭建了微服務框架,提高了開發(fā)效率。在微服...
閱讀 4153·2021-11-18 13:22
閱讀 1894·2021-11-17 09:33
閱讀 2936·2021-09-26 09:46
閱讀 1278·2021-08-21 14:11
閱讀 2954·2019-08-30 15:53
閱讀 2770·2019-08-30 15:52
閱讀 2113·2019-08-30 10:52
閱讀 1588·2019-08-29 15:30