摘要:前言最近因公司業(yè)務(wù)需求,需要使用到大數(shù)據(jù)分析。提供的可用于處理無盡的數(shù)據(jù)流。類似于把一個(gè)記錄拆分成兩條三條甚至是四條記錄例如把一個(gè)字符串分割成一個(gè)字符數(shù)組。是一個(gè)聚合操作,如計(jì)數(shù)求和求平均等。實(shí)現(xiàn)把兩個(gè)流連成一個(gè)流。
前言
最近因公司業(yè)務(wù)需求,需要使用到大數(shù)據(jù)分析。選擇了flink,第一次聽說flink我也是很懵逼的狀態(tài),不過一段時(shí)間下來有了一點(diǎn)心得,在這里和大家分享分享。有很多描述不準(zhǔn)確的,大家多提提意見。
1.flink是什么,為什么要flink?
其實(shí)大數(shù)據(jù)框架有很多,比如Hadoop(批處理),Storm(流處理),Samza(流處理),Spark...但是我們選擇的是flink,為什么呢?因?yàn)閒link是“流式批處理”,flink將每一項(xiàng)視作真正的數(shù)據(jù)流。Flink提供的DataStream API可用于處理無盡的數(shù)據(jù)流。Flink可配合使用的基本組件包括:
Stream(流)是指在系統(tǒng)中流轉(zhuǎn)的,永恒不變的無邊界數(shù)據(jù)集
Operator(操作方)是指針對(duì)數(shù)據(jù)流執(zhí)行操作以產(chǎn)生其他數(shù)據(jù)流的功能
Source(源)是指數(shù)據(jù)流進(jìn)入系統(tǒng)的入口點(diǎn)
Sink(槽)是指數(shù)據(jù)流離開Flink系統(tǒng)后進(jìn)入到的位置,槽可以是數(shù)據(jù)庫(kù)或到其他系統(tǒng)的連接器
說了這么多,我們做一個(gè)簡(jiǎn)單的demo來體驗(yàn)一下flink:
假設(shè)我們?cè)陔娚唐脚_(tái),需要近實(shí)時(shí)(5min)統(tǒng)計(jì)(1h內(nèi))商品點(diǎn)擊量的前三名。然后實(shí)時(shí)展示出來。如果使用java,我們需要做一個(gè)定時(shí)任務(wù),監(jiān)聽商品點(diǎn)擊事件,然后每5min使用sql計(jì)算一下...如果數(shù)據(jù)量小,間隔時(shí)間比較長(zhǎng),還比較好,如果數(shù)據(jù)量大,間隔時(shí)間比較短...那服務(wù)器的壓力就會(huì)賊大...但是使用flink會(huì)怎么樣呢?先看下代碼(40幾W條數(shù)據(jù)從阿里淘寶獲取,github上):
/**
Created by liuliang
on 2019/5/24
*/
public class HotItems {
public static void main(String[] args) throws Exception { // 創(chuàng)建 execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 告訴系統(tǒng)按照 EventTime 處理 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 為了打印到控制臺(tái)的結(jié)果不亂序,配置全局的并發(fā)為1,改變并發(fā)對(duì)結(jié)果正確性沒有影響 env.setParallelism(1); // URL fileUrl = HotItems.class.getClassLoader().getResource("D:mftcodesflink-learnningsrcmainjavacncrawlermftUserBehavior.csv"); Path filePath = Path.fromLocalFile(new File("D:mftcodesflink-learnningsrcmainjavacncrawlermftUserBehavior.csv")); // 抽取 UserBehavior 的 TypeInformation,是一個(gè) PojoTypeInfo PojoTypeInfopojoType = (PojoTypeInfo ) TypeExtractor.createTypeInfo(UserBehavior.class); // 由于 Java 反射抽取出的字段順序是不確定的,需要顯式指定下文件中字段的順序 String[] fieldOrder = new String[]{"userId", "itemId", "categoryId", "behavior", "timestamp"}; // 創(chuàng)建 PojoCsvInputFormat PojoCsvInputFormat csvInput = new PojoCsvInputFormat<>(filePath, pojoType, fieldOrder); env // 創(chuàng)建數(shù)據(jù)源,得到 UserBehavior 類型的 DataStream .createInput(csvInput, pojoType) // 抽取出時(shí)間和生成 watermark .assignTimestampsAndWatermarks(new AscendingTimestampExtractor () { @Override public long extractAscendingTimestamp(UserBehavior userBehavior) { // 原始數(shù)據(jù)單位秒,將其轉(zhuǎn)成毫秒 return userBehavior.timestamp * 1000; } }) // 過濾出只有點(diǎn)擊的數(shù)據(jù) .filter(new FilterFunction () { @Override public boolean filter(UserBehavior userBehavior) throws Exception { // 過濾出只有點(diǎn)擊的數(shù)據(jù) return userBehavior.behavior.equals("pv"); } }) .keyBy("itemId") .timeWindow(Time.minutes(60), Time.minutes(5)) .aggregate(new CountAgg(), new WindowResultFunction()) .keyBy("windowEnd") .process(new TopNHotItems(3)) .print(); env.execute("Hot Items Job"); } /** 求某個(gè)窗口中前 N 名的熱門點(diǎn)擊商品,key 為窗口時(shí)間戳,輸出為 TopN 的結(jié)果字符串 */ public static class TopNHotItems extends KeyedProcessFunction { private final int topSize; public TopNHotItems(int topSize) { this.topSize = topSize; } // 用于存儲(chǔ)商品與點(diǎn)擊數(shù)的狀態(tài),待收齊同一個(gè)窗口的數(shù)據(jù)后,再觸發(fā) TopN 計(jì)算 private ListState itemState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ListStateDescriptor itemsStateDesc = new ListStateDescriptor<>( "itemState-state", ItemViewCount.class); itemState = getRuntimeContext().getListState(itemsStateDesc); } @Override public void processElement( ItemViewCount input, Context context, Collector collector) throws Exception { // 每條數(shù)據(jù)都保存到狀態(tài)中 itemState.add(input); // 注冊(cè) windowEnd+1 的 EventTime Timer, 當(dāng)觸發(fā)時(shí),說明收齊了屬于windowEnd窗口的所有商品數(shù)據(jù) context.timerService().registerEventTimeTimer(input.windowEnd + 1); } @Override public void onTimer( long timestamp, OnTimerContext ctx, Collector out) throws Exception { // 獲取收到的所有商品點(diǎn)擊量 List allItems = new ArrayList<>(); for (ItemViewCount item : itemState.get()) { allItems.add(item); } // 提前清除狀態(tài)中的數(shù)據(jù),釋放空間 itemState.clear(); // 按照點(diǎn)擊量從大到小排序 allItems.sort(new Comparator () { @Override public int compare(ItemViewCount o1, ItemViewCount o2) { return (int) (o2.viewCount - o1.viewCount); } }); // 將排名信息格式化成 String, 便于打印 StringBuilder result = new StringBuilder(); result.append("==================================== "); result.append("時(shí)間: ").append(new Timestamp(timestamp-1)).append(" "); for (int i=0; i { @Override public void apply( Tuple key, // 窗口的主鍵,即 itemId TimeWindow window, // 窗口 Iterable aggregateResult, // 聚合函數(shù)的結(jié)果,即 count 值 Collector collector // 輸出類型為 ItemViewCount ) throws Exception { Long itemId = ((Tuple1 ) key).f0; Long count = aggregateResult.iterator().next(); collector.collect(ItemViewCount.of(itemId, window.getEnd(), count)); } } /** COUNT 統(tǒng)計(jì)的聚合函數(shù)實(shí)現(xiàn),每出現(xiàn)一條記錄加一 */ public static class CountAgg implements AggregateFunction { @Override public Long createAccumulator() { return 0L; } @Override public Long add(UserBehavior userBehavior, Long acc) { return acc + 1; } @Override public Long getResult(Long acc) { return acc; } @Override public Long merge(Long acc1, Long acc2) { return acc1 + acc2; } } /** 商品點(diǎn)擊量(窗口操作的輸出類型) */ public static class ItemViewCount { public long itemId; // 商品ID public long windowEnd; // 窗口結(jié)束時(shí)間戳 public long viewCount; // 商品的點(diǎn)擊量 public static ItemViewCount of(long itemId, long windowEnd, long viewCount) { ItemViewCount result = new ItemViewCount(); result.itemId = itemId; result.windowEnd = windowEnd; result.viewCount = viewCount; return result; } } /** 用戶行為數(shù)據(jù)結(jié)構(gòu) **/ public static class UserBehavior { public long userId; // 用戶ID public long itemId; // 商品ID public int categoryId; // 商品類目ID public String behavior; // 用戶行為, 包括("pv", "buy", "cart", "fav") public long timestamp; // 行為發(fā)生的時(shí)間戳,單位秒 }
}
實(shí)時(shí)模擬的結(jié)果:
==================================== 時(shí)間: 2017-11-26 09:05:00.0 No0: 商品ID=5051027 瀏覽量=3 No1: 商品ID=3493253 瀏覽量=3 No2: 商品ID=4261030 瀏覽量=3 ==================================== ==================================== 時(shí)間: 2017-11-26 09:10:00.0 No0: 商品ID=812879 瀏覽量=5 No1: 商品ID=2600165 瀏覽量=4 No2: 商品ID=2828948 瀏覽量=4 ==================================== ==================================== 時(shí)間: 2017-11-26 09:15:00.0 No0: 商品ID=812879 瀏覽量=7 No1: 商品ID=138964 瀏覽量=5 No2: 商品ID=4568476 瀏覽量=5 ==================================== ==================================== 時(shí)間: 2017-11-26 09:20:00.0 No0: 商品ID=812879 瀏覽量=8 No1: 商品ID=2338453 瀏覽量=8 No2: 商品ID=2563440 瀏覽量=7 ====================================
可以看到,我們用比較簡(jiǎn)單的代碼,就實(shí)現(xiàn)了熱點(diǎn)TOP n的問題.可見flink使用起來還是很方便的(至少比java方便不少)。
2.flink這么強(qiáng)大?為甚?
從上一個(gè)例子里面,我們已經(jīng)初步體會(huì)到了flink的方便之處。我想從一下幾個(gè)方面解釋一下:
支持多種窗口
支持table api 【第二講介紹】
exactly-once (正好一次) 【第二講介紹】
1. 支持多種窗口
1.1 關(guān)于flink窗口我手動(dòng)畫了一個(gè)簡(jiǎn)單的圖:
1.2flink窗口函數(shù)
窗口函數(shù)就是這四個(gè):ReduceFunction,AggregateFunction,F(xiàn)oldFunction,ProcessWindowFunction.(當(dāng)然也可以自定義window)
3.flink工作流程?
dataSource -> DataTransformation(*) ->dataSink
3.1 登陸監(jiān)控demo了解 dataSource和dataSink
dataSource: 基于本地集合的 source、基于文件的 source、基于網(wǎng)絡(luò)套接字的 source、自定義的 source 自定義source: a:flink提供了很多定義好的sourceFunction 比如Kafka,RabbitMq,Mysql... b:StreamExecutionEnvironment.addSource(sourceFunction) 自己寫sourceFunction (實(shí)現(xiàn)ParallelSourceFunction / RichParallelSourceFunction ) dataSink: 寫入文件、打印出來、寫入 socket 、自定義的 sink 自定義的sink a:同理,dataSink提供了很多定義好的dataSink... b:自定義dataSink
3.2 DataTransformation(*)
簡(jiǎn)單的Transformation示意圖【圖2】 Transformation:數(shù)據(jù)轉(zhuǎn)換的各種操作,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等, 操作很多,可以將數(shù)據(jù)轉(zhuǎn)換計(jì)算成你想要的數(shù)據(jù)。 hello-demo 注【1】
4.flink在我們測(cè)試環(huán)境上集成的demo
1:登陸異地監(jiān)控 (講清楚架構(gòu)關(guān)系) 2:代理樹
5.flink怎么發(fā)布?web操作界面簡(jiǎn)單介紹。
打jar包,設(shè)置參數(shù)(并發(fā)度,main函數(shù)等),上傳
注:
【1】
map就是做一些映射,比如我們把兩個(gè)字符串合并成一個(gè)字符串,把一個(gè)字符串拆成兩個(gè)或者三個(gè)字符串。
flatMap類似于把一個(gè)記錄拆分成兩條、三條、甚至是四條記錄,例如把一個(gè)字符串分割成一個(gè)字符數(shù)組。
Filter就類似于過濾。
keyBy就等效于SQL里的group by。
aggregate是一個(gè)聚合操作,如計(jì)數(shù)、求和、求平均等。
reduce就類似于MapReduce里的reduce。
join操作就有點(diǎn)類似于我們數(shù)據(jù)庫(kù)里面的join。
connect實(shí)現(xiàn)把兩個(gè)流連成一個(gè)流。
repartition是一個(gè)重新分區(qū)操作(還沒研究)。
project操作就類似于SQL里面的snacks(還沒研究)
【以上涉及到的代碼,我已經(jīng)上傳到github上面:https://github.com/iamcrawler...】
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/74910.html
摘要:前言這一講將介紹一下序列化機(jī)制和過程函數(shù)。然而由于的類型擦除,自動(dòng)提取并不是總是有效。開發(fā)者在自定義類上使用注解,隨后創(chuàng)建相應(yīng)的并覆蓋方法。 前言 這一講將介紹一下序列化機(jī)制和過程函數(shù)(processfunction)。 序列化機(jī)制 使用 Flink 編寫處理邏輯時(shí),新手總是容易被林林總總的概念所混淆: 為什么 Flink 有那么多的類型聲明方式? BasicTypeInfo.ST...
摘要:由于配置流是從關(guān)系型數(shù)據(jù)庫(kù)中讀取,速度較慢,導(dǎo)致實(shí)時(shí)數(shù)據(jù)流流入數(shù)據(jù)的時(shí)候,配置信息還未發(fā)送,這樣會(huì)導(dǎo)致有些實(shí)時(shí)數(shù)據(jù)讀取不到配置信息。從數(shù)據(jù)庫(kù)中解析出來,再去統(tǒng)計(jì)近兩周占比。 showImg(https://segmentfault.com/img/remote/1460000019367651); Flink 學(xué)習(xí)項(xiàng)目代碼 https://github.com/zhisheng17/f...
摘要:基于在阿里巴巴搭建的平臺(tái)于年正式上線,并從阿里巴巴的搜索和推薦這兩大場(chǎng)景開始實(shí)現(xiàn)。在經(jīng)過一番調(diào)研之后,阿里巴巴實(shí)時(shí)計(jì)算認(rèn)為是一個(gè)非常適合的選擇。接下來,我們聊聊阿里巴巴在層對(duì)又大刀闊斧地進(jìn)行了哪些改進(jìn)。 Apache Flink 概述 Apache Flink(以下簡(jiǎn)稱Flink)是誕生于歐洲的一個(gè)大數(shù)據(jù)研究項(xiàng)目,原名StratoSphere。該項(xiàng)目是柏林工業(yè)大學(xué)的一個(gè)研究性項(xiàng)目,早期...
閱讀 1309·2021-11-24 09:39
閱讀 470·2019-08-30 14:12
閱讀 2676·2019-08-30 13:10
閱讀 2503·2019-08-30 12:44
閱讀 1020·2019-08-29 16:31
閱讀 937·2019-08-29 13:10
閱讀 2525·2019-08-27 10:57
閱讀 3216·2019-08-26 13:57