追溯事件
自動修正已處理錯誤事件的後果。
2005 年 12 月 12 日
這是 進階企業應用程式架構開發 的一部分,我在 2000 年代中期撰寫。遺憾的是,自那時起,太多其他事情吸引了我的注意力,所以我沒有時間進一步處理它們,我也看不到在可預見的未來會有太多時間。因此,這些材料很大程度上仍是草稿形式,在我有時間再次處理它們之前,我不會進行任何更正或更新。
企業應用程式在於擷取來自世界各地的資訊,對該資訊執行各種運算,並在該世界中啟動進一步的動作。它們執行的運算和啟動的動作只能與它們接收的資訊一樣準確。如果輸入有錯誤,輸出也會出錯。
我們習慣這種現象,但一旦發現輸入錯誤,通常很難修正。在多數情況下,人類必須找出系統對應錯誤輸入執行的動作、系統如何反應、系統應該如何反應(如果取得正確輸入)以及如何修正問題。
事件溯源 的吸引力之一在於它提供了讓這項繁重任務變得更輕鬆的基礎。事實上,仔細記錄事件及其後果,讓人類更容易進行修正。追溯事件 更進一步,讓系統可以自動修正許多錯誤事件的結果。
追溯事件 建立在 事件溯源 的基礎上,因此在了解 追溯事件 之前,您需要熟悉 事件溯源。特別是,我將使用我在 事件溯源 中定義的許多術語。
運作方式
處理追溯事件表示我們的目前應用程式狀態在某種程度上不正確。如果我們收到追溯事件,我們將處於與現在不同的應用程式狀態。您可以將這視為三個 平行模型
- 不正確的現實:有目前的即時應用程式狀態,它沒有考慮追溯事件。
- 正確分支:應用程式狀態,我們應該在追溯事件處理後取得。
- 更正後的現實:我們最終想要達成的應用程式狀態。
在許多情況下,更正後的現實會與正確分支相同,也就是說,我們會重新處理事件記錄,以考慮追溯事件。有時我們無法完全做到這一點。
我們的步驟是找出不正確的現實和正確分支的分歧點。這基本上是事件記錄中應該處理追溯事件的點。我將稱之為分支點,再次使用源碼控制世界的術語。分支點是事件記錄中應插入追溯事件之前的點。
我們可以建構分支點的兩種方法:重建和倒帶。
- 在重建中,我們將應用程式狀態還原到追溯事件之前的最後一個快照。然後,我們處理所有事件,直到到達分支點。
- 在倒帶/重播中,我們將事件從最新事件倒轉到分支點。
我們可以在平行模型中建構這個分支點,或者我們可以將我們的應用程式狀態還原到分支點。如果我們還原我們的應用程式狀態,那麼當我們前進時,我們將自動建立我們的正確分支,而更正後的現實將是相同的。如果我們使用平行模型,我們將在平行模型中擁有正確的分支,並且需要對應用程式狀態進行更改,以將其轉換為更正後的現實,基本上將差異合併到應用程式狀態中。
有三大類追溯事件:順序錯誤事件、拒絕事件和不正確事件。順序錯誤事件是接收過晚的事件,足夠晚,以至於您已經處理了應該在收到順序錯誤事件後處理的事件。拒絕事件是您現在意識到是錯誤的事件,並且不應該被處理。不正確事件是您收到關於事件不正確資訊的事件。
三個事件各需要你在分支點後對事件串流執行不同的操作。
- 對於失序事件,我們在分支點插入並處理追溯事件。
- 對於拒絕的事件,我們反轉事件並將其標記為拒絕。這實際上是刪除事件記錄
- 對於不正確的事件,我們反轉原始事件並將其標記為拒絕。我們插入追溯事件並處理它。你可以將這視為拒絕的事件和失序事件被視為一體處理。
標記為拒絕的事件會被所有進一步處理忽略,因此它們不會在重播期間重新處理或在未來的倒帶中反轉。它們保留在事件記錄中以維護歷史記錄,但除此之外會被忽略。我們不需要這樣做,作為替代方案,我們可以在原始事件後立即放置一個反轉,但顯然這樣一個接一個地處理和反轉事件效率很低。
如果事件的不正確資訊改變了處理順序,不正確的事件可能會引入進一步的複雜性。因此,我們讓一艘船在 4 月 1 日到達,但我們的更正讓它在 3 月 1 日到達。對於這些情況,我們的分支點是拒絕事件和正確事件中較早的一個。我們的下一個處理取決於哪一個較早。如果,如本例所示,正確的事件較早,我們處理它並將舊事件標記為拒絕。然後我們可以向前重播,舊事件將被跳過。如果舊事件先發生,那麼我們反轉它,重播到新事件,處理它並繼續向前處理。
當我們討論這個時,請記住追溯事件本身永遠是一個事件。對於失序事件,這沒有任何區別,但它確實影響其他兩種情況。考慮拒絕事件的情況 - 實際上這是刪除事件記錄中的事件。然而,事件記錄的重點在於我們永遠不會刪除事件。因此,我們可以做的是將拒絕事件插入記錄,處理該拒絕會執行我所描述的更改。拒絕事件本身總是會被拒絕,因此它永遠不會在重建時處理。不正確的事件可以用相同的方式處理,使用包含舊事件和新事件的替換事件。事實上,你可以將拒絕事件視為沒有更正事件的替換事件。
我剛剛說過,被拒絕和替換的事件總是會被拒絕,所以它們不會重播。如果您總是希望您的平行模型使用當前最佳知識來建立,這句話是正確的。但是,如果您正在構建一個雙時間平行模型,也就是根據我們在過去某個日期所知道的,那麼您將執行更複雜的處理,這將考慮到一些被拒絕的事件。
從平行模型合併
如果您遵循在平行模型中建立正確分支的風格,那麼您必須找出正確分支和當前現實之間的差異,然後合併這些差異以形成更正的現實。
要找出變更,您需要檢查兩個模型中的所有物件,以查看哪些物件已變更,以及檢查只存在於一個模型中的任何物件。顯然這可能是一項龐大的任務,您可以透過追蹤自分支點以來受處理事件影響的物件來減少這項任務。您也可以執行一些類似選擇性重播的分析,以確定哪些物件子集會因追溯事件而變更。
找到變更後,您現在需要將這些變更套用於即時模型。值得考慮是否將這些變更作為事件本身套用。這不是絕對必要的,因為所有變更都應該可以從初始事件的分析中完全計算出來,但由於這種分析相當複雜,因此在自己的事件中擷取合併變更可能很有用。
顯然,這種合併處理可能會變得非常複雜,在您決定如何處理追溯事件時,合併處理的複雜性是一個重要的考量因素。
最佳化
由於處理追溯事件與建立平行模型非常相似,因此許多最佳化討論也適用於此。反轉、快照和選擇性重播的技術都可以幫助您在較少的事件處理下到達分支點。
如果您使用選擇性重播到達分支點,則可以使用相同的選擇性重播來處理分支點之後的事件。
測試想法
與反轉一起使用的特別有用的測試技術是,始終確保您新增的任何行為在重播下都能正常運作,方法是在測試案例的事件中新增一個追溯事件,這將強制重播測試中的整個事件順序。
另一個測試想法(儘管我還沒有直接看到任何案例)是隨機產生事件序列,然後以隨機不同的順序處理它們,以確保它們始終導致相同的應用程式狀態。
更新外部系統
事件溯源總是會導致與未以相同方式建置的外部系統不匹配。透過常規事件溯源,我們必須確保在重建期間不會將更新訊息傳送給外部系統。這相對容易,只需關閉閘道即可。然而,透過追溯事件,我們必須更進一步。我們需要找出追溯事件是否對我們應該進行的更新造成任何變更,然後在通知中處理這些變更。
此問題包含兩個部分:偵測和修正。
基本的偵測方案是確定在不正確的現實中傳送了哪些更新,在正確的分支中應該傳送哪些更新,然後找出兩個更新之間有差異的任何更新。
執行此操作的一個好方法是透過將每個外部更新轉換為事件,對閘道本身使用事件溯源。如果我們執行此操作,我們可以在不同的模型中擷取這些事件並將它們進行比較。為執行此操作,我們確保我們有一個機制來擷取所有外部更新事件並建立我們的兩個清單。我們感興趣的是從分支點以來傳送出的所有更新事件。如果我們使用倒帶,如果我們確保事件反轉傳送我們可以擷取的更新事件,我們可以擷取不正確的現實更新。
一旦我們有了不正確的現實和正確分支的兩個更新事件清單,我們就會比較它們以尋找不匹配項。我們可以忽略兩個清單中相同的任何更新,重要的是找出我們在一個清單中有事件但在另一個清單中沒有事件的情況。然後,我們可以建立兩個不同的事件清單。這些更新事件就是我們需要修正的事項。
我上面定義的所有內容都可以通用地完成,通用組件可以追蹤事件並找出兩個不匹配事件清單。然而,處理不匹配項是必須針對每個外部系統個別編寫程式,具體取決於外部系統需要採取的補償措施。很有可能無法自動化修正,而是必須由人為介入來解決混亂。但至少系統可以向人為提供相當好的說明,說明發生了什麼事,以及應該發生什麼事。
何時使用
使用追溯事件的主要原因是,當自動執行過去輸入的修正時很有用。與任何自動化一樣,您必須查看修正的頻率和難度。如果花費大量時間進行修正,那麼值得考慮使用追溯事件等工具來自動化它們。使用追溯事件自動化的最大優點在於,它是一個完全通用的流程,一旦您讓追溯事件為任何一個事件工作,那麼將其擴展到其他事件就相對容易了。
我很少看到追溯事件,這是有充分理由的。為了實作追溯事件,您需要準備一些重要的前置作業。您需要事件溯源,如果這還不夠,您需要增加可逆性或平行模型。這些不是小的前提條件。因此,建立一個支援追溯事件的應用程式是一個重要的決定,會影響整個系統。採用現有系統並新增必要的重構也不是微不足道的。進行這種自動化錯誤修正的需求通常不是早期需求,因此很容易最終得到一個需要大量工作才能讓追溯事件成為可能的設計。
如果一個系統有許多連結到外部系統,那麼這會讓使用追溯事件增加相當大的複雜性。完整的追溯處理需要完全存取資訊才能執行,並會產生外部更新的每個小變更。如果您與外部系統有大量的整合(這是常見的情況),您需要仔細查看如何處理它們的追溯活動,以了解追溯事件是否是一個可行的使用方式。
請記住,追溯事件並非全有或全無的選擇。你可以透過幾種方式限制追溯事件以縮小其範圍,但仍保留一些實用性。其中一種縮減方式是僅讓追溯事件套用在系統的子集上,特別是外部影響較小的區域。事實上,我見過它用於帳戶。另一種縮減範圍的方式是時間,許多業務營運都符合固定的週期,例如每週、每月、每年。在這種情況下,你可能只對目前處理週期(例如上週)內的事件使用追溯事件,在事情結束之前。你也可以在週期內使用較積極形式的追溯事件,並在封閉週期中使用較被動的形式。
我認為追溯事件罕見的最後一個原因是,我不認為人們很了解如何讓它發生。我希望這個模式能有助於消除這個障礙,並進而幫助我們找到其他人。
範例:使用倒轉的追溯貨物(C#)
我用一個範例說明了大多數關於事件溯源的範例,這個範例是基於將貨物在港口間透過船隻運送。我將繼續使用那個範例,這次來看看如何使用追溯事件。
首先,我們需要針對如何執行追溯事件做出一些策略性決策。對於這個範例,我將使用透過倒轉即時模型來達到分支點的方法。由於這個範例很簡單,我不會使用任何選擇性重播,而只會倒轉完整的事件清單。
由於我們使用的是倒轉,因此所有事件都必須是可逆的。我不會深入探討事件處理或反轉的詳細資訊,請參閱事件溯源中的範例,了解其運作方式。
追溯事件行為是由一種特殊類型的事件觸發的。
類別 ReplacementEvent...
private DomainEvent original, replacement; public DomainEvent Original {get { return original; }} public DomainEvent Replacement {get { return replacement; }} public ReplacementEvent(DomainEvent oldEvent, DomainEvent replacement) : base(oldEvent.Occurred) { this.original = oldEvent; this.replacement = replacement; } internal override void Process() { throw new Exception("Replacements should not be processed directly"); } internal override void Reverse() { throw new Exception("Cannot reverse replacements"); }
你會注意到,在這些情況下,我已封鎖基本流程和反轉方法。在這些範例中,我大多數時候都偏好讓事件處理自己的處理邏輯。替換不同:它們不包含任何網域邏輯,而且其處理涉及事件佇列的知識以及對該事件佇列的處理。與事件佇列的密切互動是我希望僅留給事件處理器處理的事情,因此事實上,我將所有替換行為都建置到事件佇列中。
類別 EventProcessor...
public void Process(DomainEvent ev) { try { if (ev is ReplacementEvent) ProcessReplacement((ReplacementEvent) ev); else if (OutOfOrder(ev)) ProcessOutOfOrder(ev); else BasicProcessEvent(ev); } catch (Exception ex) { ev.ProcessingError = ex; if (ShouldRethrowExceptions) throw ex; } InsertToLog(ev); }
您會在此處注意到,我正在執行物件導向程式設計的重大錯誤:根據方法引數的類型,明確條件式行為。我並不常這樣做,但在此處執行是因為我希望事件處理器將佇列的知識保留給自己。(我也可以使用雙重分派,但此處的情況似乎並不複雜到需要使用它。)
您可能也會注意到,我沒有針對拒絕的事件建立案例。正如我們將看到的,我將這些事件視為取代事件為 null 的取代事件來處理。
我將從最簡單的案例開始。基本的 Process(和反向)方法只是一個簡單的包裝器,允許我在需要時加入一些追蹤行為。
類別 EventProcessor...
private void BasicProcessEvent(DomainEvent e) { traceProcess(e); e.Process(); } private void BasicReverseEvent(DomainEvent e) { traceReverse(e); e.Reverse(); }
無序事件是最容易首先描述的。這些只是我們以非順序接收的常規事件,我們不會使用取代事件來建模它們。基本上,這些會在串流中插入一個新事件。處理器透過將無序事件與最後一個事件進行比較來測試它。
類別 EventProcessor...
private bool OutOfOrder(DomainEvent e) { if (LogIsEmpty()) return false; return (LastEvent().after(e)); } private DomainEvent LastEvent() { if (LogIsEmpty()) return null; return (DomainEvent) log[log.Count - 1]; } private bool LogIsEmpty() { return 0 == log.Count; }
為了讓這個範例好笑地簡單,我只按發生日期對事件進行排序。要實際執行此操作,可能只需轉換為更精細的時間點解析度就足夠了。有時其他因素可能會影響排序。
處理無序案例的步驟很簡單。
類別 EventProcessor...
private void ProcessOutOfOrder(DomainEvent e) { RewindTo(e); BasicProcessEvent(e); ReplayAfter(e); }
要倒轉,我只會選取日誌中晚於無序事件的所有事件。同樣,為了簡化,我將日誌(或至少快取)保存在記憶體中。
類別 EventProcessor...
private void RewindTo(DomainEvent priorEvent) { IList consequences = Consequences(priorEvent); for (int i = consequences.Count - 1; i >= 0; i--) BasicReverseEvent(((DomainEvent) consequences[i])); } private IList Consequences(DomainEvent baseEvent) { IList result = new ArrayList(); foreach (DomainEvent candidate in log) if (candidate.IsConsequenceOf(baseEvent)) result.Add(candidate); return result; }
class DomainEvent...
public bool IsConsequenceOf(DomainEvent other) { return (!ShouldIgnoreOnReplay && this.after(other)); }
如您所見,並非所有事件都選取在倒轉期間重新處理,儘管我沒有使用任何選擇性重播。基本上,我不會重新處理錯誤事件或已被拒絕的事件。
再次向前重播事件很簡單
類別 EventProcessor...
private void ReplayAfter(DomainEvent ev) { foreach (DomainEvent e in Consequences(ev)) BasicProcessEvent(e); }
現在讓我們繼續進行取代。
類別 EventProcessor...
private void ProcessReplacement(ReplacementEvent e) { if (e.Original.ShouldIgnoreOnReplay) throw new ProcessingException("Cannot replace event twice"); else if (null == e.Replacement) ProcessRejection(e); else if (e.HasPriorReplacement) ProcessPriorReplacement(e); else ProcessPriorOriginal(e); }
類別 ReplacementEvent...
public bool HasPriorReplacement { get { if (null == replacement) return false; else return original.after(replacement); } }
此處有幾個案例要處理。如果原始事件已標記為忽略,表示有處理錯誤,因為我們不應該拒絕已拒絕的事件。完成此操作後,我們有三個主要案例:取代為拒絕、取代事件早於原始事件,以及原始事件早於取代事件。
讓我們從拒絕案例開始,它是由一個空值替換事件指示的。我們在此倒帶到被拒絕的事件,拒絕它,反轉它,並向前重播。
類別 EventProcessor...
private void ProcessRejection(ReplacementEvent e) { RewindTo(e.Original); BasicReverseEvent(e.Original); e.Original.Reject(); ReplayAfter(e.Original); }
class DomainEvent...
public bool after (DomainEvent other) { return this.CompareTo(other) > 0; } public void Reject() { _isRejected = true; } private bool _isRejected; public virtual bool ShouldIgnoreOnReplay { get { if (WasProcessingError) return true; return _isRejected; } }
拒絕會標記事件,因此它不會再次被處理或倒帶。
如果我們有先前的替換,我們會倒帶到替換,拒絕原始事件,處理替換並向前重播。
類別 EventProcessor...
private void ProcessPriorReplacement(ReplacementEvent e) { RewindTo(e.Replacement); e.Original.Reject(); BasicProcessEvent(e.Replacement); ReplayAfter(e.Replacement); }
對於先前的原始事件,我們會倒帶到原始事件,反轉並拒絕它,重播到替換,處理它,並繼續向前重播。
類別 EventProcessor...
private void ProcessPriorOriginal(ReplacementEvent e) { RewindTo(e.Original); BasicReverseEvent(e.Original); e.Original.Reject(); ReplayBetween(e.Original, e.Replacement); BasicProcessEvent(e.Replacement); ReplayAfter(e.Replacement); } private void ReplayBetween(DomainEvent first, DomainEvent last) { IList eventsToReplay = new ArrayList(); foreach (DomainEvent e in log) { if (e.IsConsequenceOf(first) && last.after(e)) eventsToReplay.Add(e); } foreach (DomainEvent e in eventsToReplay) BasicProcessEvent(e); }
我遵循此處的慣例,即在成功處理事件之前不將其新增到記錄檔。如果記錄檔與應用程式狀態不在同一個交易中,這會很有用。如果它們全部都在一個交易中,我可以在到達分支點並向前重播(這將選取新的事件)後,將事件新增到記錄檔中。
範例:更新外部系統(C#)
對於簡單的事件溯源,在重播期間關閉外部通知就足夠了。然而,對於追溯事件,我們需要進一步進行偵測和修正。為在倒帶/重播期間執行此操作,我們將儲存所有產生的事件,並進行比較以確定需要修正的內容。然後,我們將假設一個簡單的自動修正案例,即傳送取消訊息和變更的新訊息。
我將繼續使用運送範例,說明美國的海關當局需要在任何通過加拿大進入美國港口的貨物時收到通知。
當貨物物件處理抵達事件時,會進行外部通知。
class Cargo...
public void HandleArrival(ArrivalEvent ev) { ev.priorCargoInCanada[this] = _hasBeenInCanada; if ("CA" == ev.Port.Country) _hasBeenInCanada = true; if (HasBeenInCanada && "US" == ev.Port.Country) { Registry.CustomsGateway.Notify(ev.Occurred, ev.Ship, ev.Port); ev.WasNotificationSent = true; } } private bool _hasBeenInCanada = false; public bool HasBeenInCanada {get { return _hasBeenInCanada;}}
為了反轉此事件,我們會在正向播放中記錄是否已傳送通知,並在傳送時再次傳送。
class Cargo...
public void ReverseArrival(ArrivalEvent ev) { _hasBeenInCanada = (bool) ev.priorCargoInCanada[this]; if (ev.WasNotificationSent) Registry.CustomsGateway.Notify(ev.Occurred, ev.Ship, ev.Port); }
我遵循的原則是,網域模型應該不了解事件處理中的重播邏輯。它知道如何為每個事件反轉其自己的狀態,但不在乎與外部系統對話的複雜性。
與外部系統對話的邏輯由閘道處理,在這種情況下,它是一群物件。

圖 1:一群物件組成閘道來處理追溯活動。
在此範例中,CustomsGatewayFront 是將面向網域的介面(由 ICustomsGateway 定義)轉換為實際訊息基礎架構的正常閘道。我們可以安全地忽略它執行的動作,我們假設如果我們呼叫它上面的方法,它會確保海關收到訊息。
有趣的事情發生在之前,由 CustomsGatewayBuffer 驅動,它包裝實際的 CustomsGatewayFront,實作相同的介面,但新增處理追溯重繞的能力。
因為許多追溯重繞是通用的,我可以將通用行為放入不同的類別 ReplayBuffer 中,讓 Customs Gateway Buffer 僅處理海關案例中特定的事項。重播緩衝區需要與事件處理器進行一些通訊,以處理重繞/重播程序中的各種階段。它也需要一些方式來調整最後不匹配的事件,這項工作在此案例中我指派給海關閘道緩衝區。在這兩種情況下,重繞緩衝區透過協作介面進行通訊,以保持其通用純度。
因此,這就是戲劇人物,現在讓我們進入動作。當我們執行時,我將從正常案例開始。海關閘道緩衝區透過建立更新事件並將其傳送至重播緩衝區來實作通知操作。
類別 CustomsGatewayBuffer...
public void Notify (DateTime arrivalDate, Ship ship, Port port) { CustomsNotificationEvent ev = new CustomsNotificationEvent(gateway, arrivalDate, ship, port); buffer.Send(ev); } ICustomsGateway gateway; ReplayBuffer buffer;
類別 CustomsNotificationEvent...
public CustomsNotificationEvent( ICustomsGateway gateway, DateTime arrivalDate, Ship ship, Port port) { this.gateway = gateway; this.arrivalDate = arrivalDate; this.ship = ship; this.port = port; }
如果重播緩衝區處於活動狀態,則它會處理事件,這會導致呼叫實際閘道前端。
類別 ReplayBuffer...
internal void Send(IGatewayEvent ev) { current.Add(ev); if (isActive) ev.Process(); }
類別 CustomsNotificationEvent...
public virtual void Process() { gateway.Notify(arrivalDate, ship, port) ; }
(我將在稍後說明重播緩衝區如何變得活躍,以及目前的清單是什麼。)
因此,如您所見,對於網域事件的活動處理,所發生的一切就是閘道緩衝區建立閘道事件,將其傳送至處理它的重播緩衝區,這會導致將原始呼叫套用至實際閘道。所有這些都是簡單委派的一種極為複雜的形式。
處理追溯事件時,這種怪異的回報會出現。重播緩衝區透過可重繞介面連接到事件處理器。
類別 ReplayBuffer...
IRewindable eventProcessor;
internal interface IRewindable { event EventProcessor.EventHandler RewindStarted, RewindFinished, ReplayFinished; }
此介面定義三個重要的事件。事件處理器在處理各種案例時會發出這些事件。以下是無序案例
類別 EventProcessor...
private void ProcessOutOfOrder(DomainEvent e) { RewindStarted(); RewindTo(e); RewindFinished(); BasicProcessEvent(e); ReplayAfter(e); ReplayFinished(); }
使用替換時,整體開始和結束是共通的,但每個案例需要特定放置來指出重繞何時結束。
類別 EventProcessor...
private void ProcessReplacement(ReplacementEvent e) { RewindStarted(); if (e.Original.ShouldIgnoreOnReplay) throw new ProcessingException("Cannot replace event twice"); else if (null == e.Replacement) ProcessRejection(e); else if (e.HasPriorReplacement) ProcessPriorReplacement(e); else ProcessPriorOriginal(e); ReplayFinished(); }
private void ProcessRejection(ReplacementEvent e) { RewindTo(e.Original); BasicReverseEvent(e.Original); RewindFinished(); e.Original.Reject(); ReplayAfter(e.Original); }
private void ProcessPriorOriginal(ReplacementEvent e) { RewindTo(e.Original); BasicReverseEvent(e.Original); RewindFinished(); e.Original.Reject(); ReplayBetween(e.Original, e.Replacement); BasicProcessEvent(e.Replacement); ReplayAfter(e.Replacement); }
private void ProcessPriorReplacement(ReplacementEvent e) { RewindTo(e.Replacement); RewindFinished(); e.Original.Reject(); BasicProcessEvent(e.Replacement); ReplayAfter(e.Replacement); }
這些事件提供重播緩衝區正確的計時資訊,以正確處理追溯處理。我在這裡使用事件,因為任何事件處理器都可以有任意數量的重播緩衝區,無需知道這些物件,只要知道它們可以瞭解這三個事件即可。
現在讓我們轉到重播緩衝區,看看它如何處理這些。重播緩衝區是使用一堆清單建立的。
類別 ReplayBuffer...
internal ReplayBuffer (IRewindable processor, IAdjustable adjuster) { this.eventProcessor = processor; this.adjuster = adjuster; SubscribeToProcessorEvents(); sent = new ArrayList(); isActive = true; current = sent; } IList sent,rewound, replayed, current; IAdjustable adjuster; private bool isActive; private void SubscribeToProcessorEvents() { eventProcessor.RewindStarted += new EventProcessor.EventHandler(processor_RewindStarted); eventProcessor.RewindFinished += new EventProcessor.EventHandler(processor_RewindFinished); eventProcessor.ReplayFinished += new EventProcessor.EventHandler(processor_ReplayFinished); }
建立緩衝區時,緩衝區是處於活動狀態,表示它會處理傳送給它的任何事件。目前的清單變數會設定為三個核心清單之一,視我們在倒帶的哪個位置而定 - 當處於活動狀態時,它會指向傳送的清單。這讓我們記錄所有實際傳送的事件。它也會訂閱事件處理器上的所有相關事件。(現在不用擔心調整器,我們會在修正中處理它。)
在倒帶的開始,緩衝區會讓自己處於非活動狀態,以避免將事件傳遞到真實閘道,它也會交換目前的清單變數,指向一個新的清單,以便它可以擷取所有倒帶事件。
類別 ReplayBuffer...
private void processor_RewindStarted() { isActive = false; rewound = new ArrayList(); current = rewound; }
有了這個範例,你可能可以找出倒帶停止時會發生什麼事,但以防萬一...
類別 ReplayBuffer...
private void processor_RewindFinished() { replayed = new ArrayList(); current = replayed; }
所以到目前為止,很明顯最後一個事件會是最有趣的,因為我們需要在這裡計算不匹配清單。
類別 ReplayBuffer...
private void processor_ReplayFinished() { current = sent; DetermineChange(); isActive = true; rewound = null; replayed = null; } private void DetermineChange() { IList matchingEvents = new ArrayList(); foreach (IGatewayEvent ev in replayed) { if (rewound.Contains(ev)) matchingEvents.Add(ev); } foreach (IGatewayEvent ev in matchingEvents) { replayed.Remove(ev); rewound.Remove(ev); } adjuster.Adjust(rewound, replayed); }
演算法其實很簡單,我們只要找出哪些事件相符,然後將它們從兩個清單中移除。完成後,我們會有兩個不匹配事件的清單,我們會將它們傳遞給調整器處理。
類別 CustomsGatewayBuffer…
public void Adjust(IList oldEvents, IList newEvents) { foreach (CustomsNotificationEvent e in oldEvents) new CustomsCancellationEvent(e).Process(); foreach (CustomsNotificationEvent e in newEvents) e.Process(); } class CustomsCancellationEvent { private CustomsNotificationEvent original; public CustomsCancellationEvent(CustomsNotificationEvent original) { this.original = original; } public void Process() { original.Gateway.Cancel(original.ArrivalDate, original.Ship, original.Port); } }
在這個案例中,我讓海關閘道緩衝區透過傳送取消通知給所有結束後倒帶的事件,以及傳送新通知給新的事件,來處理調整。在我的幻想世界中,政府機關真的非常配合。在實際上,每個調整都需要個別考量,它們很容易變得非常複雜,需要人工介入。不過,事件清單應該能大幅協助釐清外部系統需要執行的動作。