摘要:下面將開始分析它的源碼。僅僅定義了一個最小應有的行為。更好的選擇由于該庫是為定制而生,故此有一些防御性判斷,源碼顯得略為。
本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog...前言
在ZStack(或者說產(chǎn)品化的IaaS軟件)中的任務通常有很長的執(zhí)行路徑,錯誤可能發(fā)生在路徑的任意一處。為了保證系統(tǒng)的正確性,需提供一種較為完善的回滾機制——在ZStack中,通過一個工作流引擎,ZStack的每一個步驟都被包裹在獨立的工作流中,可以在出錯的時候回滾。此外,通過在配置文件中組裝工作流的方式,關鍵的執(zhí)行路徑可以被配置,這使得架構(gòu)的耦合度進一步降低。
系統(tǒng)解耦合的手段除了之前文章所提到的分層、分割、分布等,還有一個重要手段是異步,業(yè)務之間的消息傳遞不是同步調(diào)用,而是將一個業(yè)務操作分成多個階段,每個階段之間通過共享數(shù)據(jù)的方式異步執(zhí)行進行協(xié)作。
這即是一種在業(yè)務設計原則中——流程可定義原則的具象化。接觸過金融行業(yè)的同學肯定知道,不同的保險理賠流程是不一樣的。而承保流程和理賠流程是分離的,在需要時進行關聯(lián),從而可以復用一些理賠流程,并提供一些個性化理賠流程。
演示代碼就以創(chuàng)建VM為例,在ZStack中大致可以分以下幾個步驟:
org.zstack.compute.vm.VmImageSelectBackupStorageFlow org.zstack.compute.vm.VmAllocateHostFlow org.zstack.compute.vm.VmAllocatePrimaryStorageFlow org.zstack.compute.vm.VmAllocateVolumeFlow org.zstack.compute.vm.VmAllocateNicFlow org.zstack.compute.vm.VmInstantiateResourcePreFlow org.zstack.compute.vm.VmCreateOnHypervisorFlow org.zstack.compute.vm.VmInstantiateResourcePostFlow
可以說是代碼即文檔了。在這里,ZStack顯式聲明這些Flow在Spring XML中,這些屬性將會被注入到createVmWorkFlowElements中。每一個Flow都被拆成了一個個較小的單元,好處不僅是將業(yè)務操作分成了多個階段易于回滾,還是可以有效復用這些Flow。這也是編程思想中“組合”的體現(xiàn)。
如何使用除了這種配置型聲明,還可以在代碼中靈活的使用這些FlowChain。在這里,我們將以Case來說明這些FlowChain的用法,避免對ZStack業(yè)務邏輯不熟悉的讀者看的一頭霧水。
一共有兩種可用的FlowChain:
SimpleFlowChain
ShareFlowChain
SimpleFlowChain我們先來看一個Case。
@Test public void test() { FlowChain chain = FlowChainBuilder.newShareFlowChain(); chain.then(new ShareFlow() { int a; @Override public void setup() { flow(new NoRollbackFlow() { @Override public void run(FlowTrigger trigger, Map data) { a = 1; increase(); trigger.next(); } }); flow(new NoRollbackFlow() { @Override public void run(FlowTrigger trigger, Map data) { a = 2; increase(); trigger.next(); } }); } }).done(new FlowDoneHandler(null) { @Override public void handle(Map data) { success = true; } }).start(); Assert.assertTrue(success); expect(2); }
我們可以看到,這就是一個工作流。完成一個工作流的時候(回調(diào)觸發(fā)時)執(zhí)行下一個工作流——由trigger.next觸發(fā)。不僅如此,還可以添加Rollback屬性。
@Test public void test() throws WorkFlowException { final int[] count = {0}; new SimpleFlowChain() .then(new Flow() { @Override public void run(FlowTrigger chain, Map data) { count[0]++; chain.next(); } @Override public void rollback(FlowRollback chain, Map data) { count[0]--; chain.rollback(); } }) .then(new Flow() { @Override public void run(FlowTrigger chain, Map data) { count[0]++; chain.next(); } @Override public void rollback(FlowRollback chain, Map data) { count[0]--; chain.rollback(); } }) .then(new Flow() { @Override public void run(FlowTrigger chain, Map data) { chain.fail(null); } @Override public void rollback(FlowRollback chain, Map data) { count[0]--; chain.rollback(); } }) .start(); Assert.assertEquals(-1, count[0]); }
rollback由FlowTrigger的fail觸發(fā)。這樣我們可以保證在發(fā)生一些錯誤的時候及時回滾,防止我們的系統(tǒng)處于一個有臟數(shù)據(jù)的中間狀態(tài)。同時,Map也可以用來在Flow之間傳遞上下文。
ShareFlowChainpublic class TestShareFlow { int[] count = {0}; boolean success; private void increase() { count[0]++; } private void decrease() { count[0]--; } private void expect(int ret) { Assert.assertEquals(count[0], ret); } @Test public void test() { FlowChain chain = FlowChainBuilder.newShareFlowChain(); chain.then(new ShareFlow() { int a; @Override public void setup() { flow(new NoRollbackFlow() { @Override public void run(FlowTrigger trigger, Map data) { a = 1; increase(); trigger.next(); } }); flow(new NoRollbackFlow() { @Override public void run(FlowTrigger trigger, Map data) { a = 2; increase(); trigger.next(); } }); } }).done(new FlowDoneHandler(null) { @Override public void handle(Map data) { success = true; } }).start(); Assert.assertTrue(success); expect(2); } @Before public void setUp() throws Exception { new BeanConstructor().build(); } }
比起SimpleFlowChain,ShareFlowChain則是一個Inner class,在相同的作用域里,傳遞數(shù)據(jù)變得更加的方便了。
它的實現(xiàn)在ZStack中,F(xiàn)lowChain作為核心庫,其實現(xiàn)也是非常的簡單(可以直接參考SimpleFlowChain和ShareFlowChain),本質(zhì)就是將任務放入List中,由內(nèi)部方法進行迭代,在此基礎上做了一系列操作。下面將開始分析它的源碼。
從接口說起public interface FlowChain { ListgetFlows(); FlowChain insert(Flow flow); FlowChain insert(int pos, Flow flow); FlowChain setFlowMarshaller(FlowMarshaller marshaller); FlowChain then(Flow flow); FlowChain done(FlowDoneHandler handler); FlowChain error(FlowErrorHandler handler); FlowChain Finally(FlowFinallyHandler handler); FlowChain setData(Map data); FlowChain putData(Map.Entry... es); FlowChain setName(String name); void setProcessors(List processors); Map getData(); void start(); FlowChain noRollback(boolean no); FlowChain allowEmptyFlow(); }
接口的名字非常的易懂,那么在這里就不多作解釋了。FlowChain僅僅定義了一個Flow最小應有的行為。
//定義了Flow的回滾操作接口 public interface FlowRollback extends AsyncBackup { //回滾操作 void rollback(); //設置跳過回滾操作 void skipRestRollbacks(); }
//定義了觸發(fā)器的行為接口 public interface FlowTrigger extends AsyncBackup { //觸發(fā)失敗,調(diào)用errorHandle void fail(ErrorCode errorCode); //觸發(fā)下一個flow void next(); //setError后,在下次調(diào)用next的時才會調(diào)用errorHandle void setError(ErrorCode error); }源碼解析 Flow
public interface Flow { void run(FlowTrigger trigger, Map data); void rollback(FlowRollback trigger, Map data); }
Flow的定義其實非常的簡單——一組方法。執(zhí)行和對應的回滾,一般在ZStack中都以匿名內(nèi)部類的方式傳入。
Chain的用法在之前的SimpleFlowChain的case中。我們可以看到一系列的鏈式調(diào)用,大致如下:
new SimpleFlowChain().then(new flow()).then(new flow()).then(new flow()).start();
then本質(zhì)是往List
public SimpleFlowChain then(Flow flow) { flows.add(flow); return this; }
再來看看start
@Override public void start() { // 檢測flow中是否設置了processors。一般用來打trace if (processors != null) { for (FlowChainProcessor p : processors) { p.processFlowChain(this); } } //如果flows為空但是之前在設置中允許為空,那么就直接直接done部分的邏輯。不然就報錯 if (flows.isEmpty() && allowEmptyFlow) { callDoneHandler(); return; } if (flows.isEmpty()) { throw new CloudRuntimeException("you must call then() to add flow before calling start() or allowEmptyFlow() to run empty flow chain on purpose"); } //每個flow必須有一個map,用來傳遞上下文 if (data == null) { data = new HashMap(); } //標記為已經(jīng)開始 isStart = true; //如果沒有名字的話給flow 取一個名字,因為很有可能是匿名使用的flow if (name == null) { name = "anonymous-chain"; } logger.debug(String.format("[FlowChain(%s): %s] starts", id, name)); //打印trace,方便調(diào)試 if (logger.isTraceEnabled()) { List names = CollectionUtils.transformToList(flows, new Function () { @Override public String call(Flow arg) { return String.format("%s[%s]", arg.getClass(), getFlowName(arg)); } }); logger.trace(String.format("execution path: %s", StringUtils.join(names, " --> "))); } //生成一個迭代器 it = flows.iterator(); //從it中獲取一個不需要跳過的flow開始執(zhí)行。如果沒有獲取到,就執(zhí)行done邏輯 Flow flow = getFirstNotSkippedFlow(); if (flow == null) { // all flows are skipped callDoneHandler(); } else { runFlow(flow); } }
再來看一下runFlow中的代碼
private void runFlow(Flow flow) { try { //看報錯信息就可以猜到在做什么防御措施了:如果一個transaction在一個flow中沒有被關閉而跳到下一個flow時,會拋出異常。這個防御機制來自于一個實習生寫的bug,當時被排查出來的時候花了非常大的力氣——現(xiàn)象非常的詭異。所以現(xiàn)在被寫在了這里。 if (TransactionSynchronizationManager.isActualTransactionActive()) { String flowName = null; String flowClassName = null; if (currentFlow != null) { flowName = getFlowName(currentFlow); flowClassName = currentFlow.getClass().getName(); } throw new CloudRuntimeException(String.format("flow[%s:%s] opened a transaction but forgot closing it", flowClassName, flowName)); } //toRun就是一個當前要run的flow Flow toRun = null; if (flowMarshaller != null) { //flowMarshaller 實際上是一個非常惡心的玩意兒。尤其在一些配置好掉的xml flow突然因為一些條件而改變接下來執(zhí)行的flow令人很無語...但是也提供了一些靈活性。 toRun = flowMarshaller.marshalTheNextFlow(currentFlow == null ? null : currentFlow.getClass().getName(), flow.getClass().getName(), this, data); if (toRun != null) { logger.debug(String.format("[FlowChain(%s): %s] FlowMarshaller[%s] replaces the next flow[%s] to the flow[%s]", id, name, flowMarshaller.getClass(), flow.getClass(), toRun.getClass())); } } if (toRun == null) { toRun = flow; } if (CoreGlobalProperty.PROFILER_WORKFLOW) { //對flow的監(jiān)視。比如flow的執(zhí)行時間等 stopWatch.start(toRun); } currentFlow = toRun; String flowName = getFlowName(currentFlow); String info = String.format("[FlowChain(%s): %s] start executing flow[%s]", id, name, flowName); logger.debug(info); //在flow中還允許定義afterDone afterError afterFinal的行為。稍后將會介紹 collectAfterRunnable(toRun); //終于到了run,這里就是調(diào)用者傳入的行為來決定run中的邏輯 toRun.run(this, data); //fail的邏輯稍后解析 } catch (OperationFailureException oe) { String errInfo = oe.getErrorCode() != null ? oe.getErrorCode().toString() : ""; logger.warn(errInfo, oe); fail(oe.getErrorCode()); } catch (FlowException fe) { String errInfo = fe.getErrorCode() != null ? fe.getErrorCode().toString() : ""; logger.warn(errInfo, fe); fail(fe.getErrorCode()); } catch (Throwable t) { logger.warn(String.format("[FlowChain(%s): %s] unhandled exception when executing flow[%s], start to rollback", id, name, flow.getClass().getName()), t); fail(errf.throwableToInternalError(t)); } }
fail
@Override public void fail(ErrorCode errorCode) { isFailCalled = true; setErrorCode(errorCode); //放入Stack中,之后Rollback會根據(jù)Stack中的flow順序來 rollBackFlows.push(currentFlow); //rollback會對this.rollBackFlows中flow按照順序調(diào)用rollback rollback(); }FlowTrigger
//定義了觸發(fā)器的行為接口 public interface FlowTrigger extends AsyncBackup { //觸發(fā)失敗,調(diào)用errorHandle void fail(ErrorCode errorCode); //觸發(fā)下一個flow void next(); //setError后,在下次調(diào)用next的時才會調(diào)用errorHandle void setError(ErrorCode error); }
之前已經(jīng)看過fail的代碼。接下來來看看next和setError。
@Override public void next() { //如果flow沒有run起來的情況下,是不能調(diào)用next的 if (!isStart) { throw new CloudRuntimeException( String.format("[FlowChain(%s): %s] you must call start() first, and only call next() in Flow.run()", id, name)); } //當rollback開始的時候也不允許next if (isRollbackStart) { throw new CloudRuntimeException( String.format("[FlowChain(%s): %s] rollback has started, you can"t call next()", id, name)); } //將當前flow的push進rollback用的stack rollBackFlows.push(currentFlow); logger.debug(String.format("[FlowChain(%s): %s] successfully executed flow[%s]", id, name, getFlowName(currentFlow))); //獲取下一個flow。在這里才是真正意義上的next Flow flow = getFirstNotSkippedFlow(); if (flow == null) { // no flows, or all flows are skipped if (errorCode == null) { callDoneHandler(); } else { callErrorHandler(false); } } else { runFlow(flow); } }
可以看一下getFirstNotSkippedFlow,本質(zhì)上是利用了迭代器的特性。
private Flow getFirstNotSkippedFlow() { Flow flow = null; while (it.hasNext()) { flow = it.next(); if (!isSkipFlow(flow)) { break; } } return flow; }
接下來是setError
@Override public void setError(ErrorCode error) { setErrorCode(error); } //往下看 private void setErrorCode(ErrorCode errorCode) { this.errorCode = errorCode; }
根據(jù)之前的next邏輯:
if (flow == null) { // no flows, or all flows are skipped if (errorCode == null) { callDoneHandler(); } else { callErrorHandler(false); } } else { runFlow(flow); }
我們可以大致猜想到,如果在next的時候當前error不為空,則調(diào)用錯誤handle。這樣在setError后還可以做一些事情。
無論是調(diào)用errorHandle還是doneHandle,都會調(diào)用finalHandle。finalHandle也允許用戶定義這部分的邏輯,使flow更加的靈活。
更好的選擇由于該庫是為ZStack定制而生,故此有一些防御性判斷,源碼顯得略為verbose。如果有同學對此感興趣,想將其應用到自己的系統(tǒng)中,筆者推薦使用:jdeferred。
Java Deferred/Promise library similar to JQuery
由于JavaScript 中的代碼都是異步調(diào)用的。簡單說,它的思想是,每一個異步任務返回一個Promise對象,該對象有一個then方法,允許指定回調(diào)函數(shù)。
在這里列出幾個較為簡單的示范,或者有興趣的讀者也可以參考這里:
import org.jdeferred.DeferredManager; import org.jdeferred.Promise; import org.jdeferred.impl.DefaultDeferredManager; import org.junit.After; import org.junit.Assert; import org.junit.Test; import java.util.concurrent.TimeUnit; public class deferSimpleTest { private static int var = 0; final DeferredManager dm = new DefaultDeferredManager(); @After public void cleanUp() { var = 0; } @Test public void test() { Promise p1 = dm.when(() -> { var += 1; }).then(result -> { var += 1; }); Promise p2 = dm.when(() -> { var += 1; }).then(result -> { var += 1; }); dm.when(p1, p2).done(Void -> var += 1); Assert.assertEquals(5, var); } @Test public void test2() { final DeferredManager dm = new DefaultDeferredManager(); Promise promise = dm.when(() -> { var += 1; }).then(result -> { var += 1; }); dm.when(promise).done(Void -> var += 1); Assert.assertEquals(3, var); } @Test public void testBadCallback() { Promise promise = dm.when(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } }); dm.when(promise).done(Void -> { var += 1; throw new RuntimeException("this exception is expected"); } ).fail(Void -> { System.out.print("fail!"); var -= 1; }); Assert.assertEquals(0, var); } }
如果你在使用Java8,那么也可以通過CompletableFuture來得到“類似”的支持。
文章版權歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/70845.html
摘要:因為這個狀態(tài)下,是交給一個線程在執(zhí)行的,見源碼剖析之核心庫鑒賞中的分析。并且允許等行為。上面提到過,允許運行暫停取消等行為。維護和相應的之間的關系。則停止執(zhí)行并觸發(fā)之前的所有。 本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog... 前言 在ZStack中,當用戶在UI上發(fā)起操作時,前端會調(diào)用后端的API對實際的資源發(fā)起操作請求。但在一個分布式系統(tǒng)中...
摘要:但在實際的二次開發(fā)中,這些做法未必能夠完全滿足需求。在源碼剖析之核心庫鑒賞一文中,我們了解到是的基礎設施之一,同時也允許通過顯示聲明的方式來聲明。同理,一些也可以使用繼承進行擴展。 本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog... 前言 在ZStack博文-5.通用插件系統(tǒng)中,官方提出了幾個較為經(jīng)典的擴展方式。但在實際的二次開發(fā)中,這些做法未必...
摘要:本文首發(fā)于泊浮目的專欄在語言中,有一個關鍵字叫做其作用是在函數(shù)前執(zhí)行。一般有兩種用法在該函數(shù)拋出異常時執(zhí)行。在該函數(shù)返回前執(zhí)行。這里的放入來自系統(tǒng)啟動時利用反射所做的一個行為。因此并不會影響使用時的性能。 本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog... 在Go語言中,有一個關鍵字叫做defer——其作用是在函數(shù)return前執(zhí)行。在ZStac...
摘要:本文首發(fā)于泊浮目的專欄在語言中,有一個關鍵字叫做其作用是在函數(shù)前執(zhí)行。一般有兩種用法在該函數(shù)拋出異常時執(zhí)行。在該函數(shù)返回前執(zhí)行。這里的放入來自系統(tǒng)啟動時利用反射所做的一個行為。因此并不會影響使用時的性能。 本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog... 在Go語言中,有一個關鍵字叫做defer——其作用是在函數(shù)return前執(zhí)行。在ZStac...
摘要:每個消息都會被一個線程消費,同時最大并發(fā)量為。然后提交一個任務到線程池中,這個任務的內(nèi)容是從等待隊列中取出一個,如果等待隊列為空,則刪除這個等待隊列的。小結(jié)本文分析了的久經(jīng)生產(chǎn)考驗的核心組件線程池。 本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog... 前言 在ZStack中,最基本的執(zhí)行單位不僅僅是一個函數(shù),也可以是一個任務(Task。其本質(zhì)實現(xiàn)...
閱讀 2178·2023-04-26 01:56
閱讀 3175·2021-11-18 10:02
閱讀 3155·2021-09-09 11:35
閱讀 1415·2021-09-03 10:28
閱讀 3488·2019-08-29 18:36
閱讀 2916·2019-08-29 17:14
閱讀 890·2019-08-29 16:10
閱讀 1670·2019-08-26 13:45