摘要:在7月21號的全球架構師峰會深圳站上,美國科技公司Uber的高級工程師趙磊做了主題演講“Uber高可用消息系統構建”,本文分享了分布式系統中的各種錯誤處理的方案。
趙磊在7月21號的全球架構師峰會深圳站上,做了主題演講: Uber高可用消息系統構建, 對于這個熱門主題,高可用架構群展開了熱議,大家對分布式系統中的各種錯誤處理非常感興趣。Tim Yang特邀趙磊通過微信群,在大洋彼岸的硅谷給大家進一步分享。
分布式系統單點故障怎么辦?
non-sharded, stateless類型服務非常容易解決單點故障。 通常load balancer可以按照固定的時間間隔,去health check每個node, 當某一個node出現故障時,load balancer可以把故障的node從pool中排除。
很多服務的health check設計成簡單的TCP connect, 或者用HTTP GET的方式,去ping一個特定的endpoint。當業務邏輯比較復雜時,可能業務endpoint故障,但是health endpoint還能正常返回,導致load balancer無法發現單點故障,這種情況可以考慮在health check endpoint中增加簡單的業務邏輯判斷。
對于短時間的network故障,可能會導致這段時間很多RPC call failures。 在RPC client端通常會實現backoff retry。 failure可能有幾種原因:
TCP connect fail,這種情況下retry不會影響業務邏輯,因為Handler還沒有執行。
receive timeout, client無法確定handler是不是已經收到了request 而且處理了request,如果handler重復執行會產生side effect,比如database write或者訪問其他的service, client retry可能會影響業務邏輯。
對于sharded service,關鍵是如何找到故障點,而且將更新的membership同步到所有的nodes。下面討論幾種sharding的方案:
將key space hash到很多個小的shard space, 比如4K個shards。 通過zookeeper (distributed mutex) 選出一個master,來將shard分配到node上,而且health check每一個node。當遇到單點故障時,將已經assigned的shards轉移到其他的nodes上。 因為全局只有一個single master, 從而保證了shard map的全局一致。當master故障時,其他的backup node會獲得lock成為Master
Consistent hashing方式。consistent hashing 通常用來實現cache cluster,不保證一致性。 因為每個client會獨立health check每一個node, 同時更新局部的membership。 在network partition的情況或者某一個node不停的重啟, 很可能不同的client上的membership不一致,從而將相同的key寫在了不同的node上。 當一致性的需求提高時,需要collaborative health check, 即每個node要monitor所有其他node的health。 Uber在這里使用的是gossip protocol,node之間交換health check的信息。
大面積故障怎么辦?
大面積故障時,比如交換機故障(rack switch failure),可用的機器不足以處理所有的請求。 我們盡可能做的就是用50%的capacity 處理50%的請求或者50%用戶的所有請求。而盡量避免整個服務故障。 當設計一個服務的時候,它的throughput應該是可linear scale的。
在同樣的CPU占用情況下,1個機器應該處理100個請求,那么5個機器應該可以處理500個請求。
而且在同樣的機器數量下,20%的CPU可以處理200個請求,那么60%的CPU應該可以處理3倍即600個請求。
后者是很難實現的,而且當CPU越高的時候,服務的throughput并不是線性的。 通常在80%CPU以上的情況,throughput會下降非常快。 隨著CPU使用增加,request的latency也會提高。 這對上下游的服務可能都是一個挑戰,可能會導致cascade failure。
對于nodejs或者java nio一類的async IO框架來說,另外一個問題就是event loop lag。 這兩者可能導致connection數量增加。下面舉兩個例子
有些RPC transport支持pipelining但不支持multiplexing (out of order responses), pipelining是指在同一個TCP連接上可以連續發出Req1, Req2, Req3, Response1, Response2, Response3,即Response的順序必須和Request的順序是一致。Req1如果需要很長時間,Req2和3就都不能返回。一個Request如果占用太長時間,會導致后面的很多個Request timeout。RPC client通常也會限制在一個TCP connection上面的max pending requests。但timeout發生,或者max pending requests情況下,client會主動創建新的connection。
event loop lag 是指程序占用太長時間執行連續的CPU intensive任務。 只有當任務結束時,event loop才會handle IO events,比如從socket上面讀數據。否則收到的數據只能保存在kernel 的TCP buffer里,通常這個buffer size小于64KB。當buffer滿時(而且service又很長時間沒有讀buffer),socket的遠端就不能發送更多的數據。這時也會導致遠端的transport error。同樣的,client會主動創建新的connection,當connection增加到預設的fd limit時,service就不能繼續accept新的TCP connection了,其實是不能open新的文件了。而且,絕大部分的程序沒有測試過達到fd limit的場景。很多API需要open file, 比如logging和core dump. 所以,一旦達到fd limit, 就像out of memory一樣,將很難recover,只能crash process. 而這時正是過載的時候,重啟實際上減少了capacity。 任何crash在過載的情況下只會更糟。
facebook在這防止過載上做的很好,在C++實現的thrift server上,有一個或者多個threads只負責accept TCP connections. 你可以指定最多的connections for thrift calls。 這個connection limit是遠小于fd limit, 當connection太多時,thrift server可以fail fast。所以,這種情況下可以讓service能一直保持在max qps。
整個數據中心掛掉怎么辦?
在Uber的場景中,如果rider已經在一個trip上了,我們通產會等trip結束后才把rider遷移到其他的數據中心,我們叫做 soft failover 。
否則需要hard failover,我們會把DNS指向其他的數據中心。 而且用戶的DNS服務器很可能在一段時間內還是cache以前的ip,而且這個cache的時間是基本沒辦法控制的,所以我們會在load balancer上返回HTTP redirect,這樣手機的客戶端收到后會立即轉向新的備份數據中心。
驚群問題(thundering herd), 很多服務在provision的時候根據平常的QPS預留了很少的容量空間,當數據中心或者load balancer重啟的時候,如果所有的客戶端同時發起請求,這時的QPS可以是平時的很多倍。 很可能導致大部分請求都失敗。一方面需要在客戶端實現exponential backoff, 即請求失敗后retry的間隔時間是增長的,比如1秒,5秒,20秒等等。另外在load balancer上實現rate limiting或者global blackhole switch, 后者可以有效的丟掉一部分請求而避免過載,同時盡早觸發客戶端的backoff邏輯。
如果大家用AWS或者其他云服務的話,AWS的一個region通常包括幾個數據中心。各個數據中心甚至在相鄰的介個城市,有獨立的空調系統和供電。
數據中心之間有獨立的網絡 high throughput low latency, 但是在region之間的網絡通常是共有的 high throughput high lantecy
整個region掛掉很少發生。可以把服務部署在多個可用區(Availability Zone)來保證高可用性。
Q &A
Q1:health check endpoint中實現簡單的業務邏輯,這個意思是load balancer中有業務邏輯檢查的插件么?這樣load balancer會不會很重啊,可以詳細說一下么?
load balancer仍然是HTTP GET, health check 沒有額外的開銷,但是服務本身處理health的方式不同,可加入業務邏輯相關的檢查 比如是不是能夠訪問數據庫。
Q2:region切換時,用戶的數據是怎么遷移的?
這個是個很好的問題,Uber采取的是個非常特別的方法。 realtime系統會在每次用戶state change。state change的時候把新的state下載到手機上,而且是加密的。當用戶需要遷移到新的數據中心的時候,手機需要上傳之前下載的state,服務就可以從之前的state開始,但是non-realtime系統 比如用戶數據是通過sql replication來同步的。是Master-master。而且Uber在上層有個數據抽象,數據是基本上immutable的 append-only 所以基本不存在沖突。
Q3:如果是req timeout,但另外一邊已經執行成功了,這時候重試,那不就是產生了兩次數據?特別是insert這種類型的。
是的,如果是GET類型的請求可以retry, 但是POST類型的請求 那么只能在conn timeout時可以安全的retry。 但是receive timeout不能重試。(Tim補充看法:對于POST請求,如果service實現了冪等操作也是可以retry)。 有些類型的數據可以自動merge比如set和map
Q4:那receive timeout,這種情況下,只能通過merge或者沖突對比解決?
恩 是的。 需要在邏輯層判斷是不是能夠retry。 這個我建議在更上層實現, 比如在消息系統中,全程不retry 就可以保證at most once delivery, 如果需要保證at least once delivery 需要加入數據庫和client dedupe
Q5:大面積故障時Uber用什么手段來控制只處理部分用戶請求?
我們實現了一些rate limiting 和 circuit breaking的庫,但是這時針對所有請求的。 我們現在還沒有做到只處理某些用戶的請求。
Q6:“將key space hash到相對小的shard space, 因為全局只有一個single master, 從而保證了shard map的全局一致” 這個方案每次計算shard node的時候,必須先詢問下master么?
是的。 在client端有一個shard map的cache, 每隔幾秒鐘可以refresh, 如果是復雜的實現,則可以是master 推送shardmap change。
Q7:多個機房的數據是sharding存儲(就是每個機房只存儲一部分用戶數據),還是所有機房都有所有用戶全量數據?
Uber現在的做法是每個機房有所有用戶的數據。 facebook的做法是一個機房有一部分用戶的數據。
Q8:那多個機房的數據同步采用什么方案?
facebook用的就是mysql replication,有些細節我不清楚。 Uber還沒有跨數據中心的replication,但是我們考慮買riak的enterprise服務,可以支持跨數據中心的 replication。 對于sql數據 我們就2個方案:大部分用戶數據還是在postgresql里的(沒有sharding, 是個single node),因為Uber起家的時候就在postgres上,這個數據是用postgres原生支持的replication, 另外有個mysql的, mysql存的是trip的數據, 所以是append only而且不需要merge的。 這個我還需要確認是不是每個數據中心里面有全量的數據還是只有本地產生的trip數據。
Uber數據抽象做的比較好,數據分為3類:
最小的 realtime的,跟ongoing trip的個數成正比。 正在遷移到riak
比較大 非realtime的,跟user個數成正比。在postgresql里面 用postgresql的relication,正在遷移到mysql,用mysql的replication
最大 非realtime的,跟trip個數成正比。 在MySQL里面有很多partition,一個用戶在一個partitionl里面,一個partition一個全局的master,寫都去master。 而且Partition很少遷移,所以當seconary變成Master時,可能沒有用戶之前的trip的信息,replication是offline的 好像是通過backup-restore實現的。
Q9: 那如何實現“每個機房都有全量數據”的?
不是實時的,是在應用層實現的,而且現在還沒開始大規模使用。 另外問下riak 有同學在用么? Uber 的很多系統去年就開始遷移到riak上了,因為riak是保證availability的 。將來在Uber會是重點
Q10:Uber的消息系統是基于nodejs的嗎?客戶端長鏈接的性能和效率方面如何優化?
是基于nodejs的。我們沒有特別優化性能,不過stress test看起來2個物理機可以保持800K連接
Q11:Uber消息系統協議自己DIY嗎? 是否基于TLS? PUSH消息QPS能達到多少?
是的,基于HTTPS。 具體QPS我不太記得了。
Q12:riak的性能如何?主要存儲哪些類型的數據呢?存儲引擎用什么?raik的二級索引有沒有用到呢?
riak性能我沒測試過,跟數據類型和consistency level都有關系。 可能差別比較大。 我們現在用的好像是leveldb
Q13:應用層實現多機房數據一致的話,是同時多寫嗎? 這個latency會不會太長?
sql現在都是用在non-realtime系統里面,所以latency可能會比較長
Q14:Uber rpc用的什么框架,上面提到了Thrift有好的fail fast策略,Uber有沒有在rpc框架層面進行fail fast設計?
Uber在RPC方面還剛開始。 我們一直是用http+json的,最近在朝tchannel+thrift發展, tchannel是一個類似http2.0的transport,tchannel 在github上能找到。
我們的nodejs thrift 是自己實現的,因為apache thrift在node上做的不是很好,thrift的實現叫做thriftify https://github.com/Uber/thriftify
正好推薦下我的開源項目哈。 在thrift server上我們沒有做fail fast, 如何保護是在routing service中實現的。
Q15:Uber走https協議,有沒有考慮spdy/http2.0之類的呢?在中國網速狀況不是很好的,Uber有沒有一些https連接方面的優化措施?
正在考慮遷移到HTTP2.0,這個主要是手機端有沒有相應的client實現。 server端我們用的是nginx,nginx上有個experiemnt quality的extension可以支持spdy。 我們還考慮過用facebook的proxygen https://github.com/facebook/proxygen,proxygen支持spdy。 我在facebook的chat service是用proxygen實現的,而且facebook 幾十萬臺PHP server都在proxygen上,所以可以說是工業級強度的基礎設施,不過build起來要花點時間。
Q16:為了避免服務過載和cascade failure,除了在服務鏈的前端采用一些fail fast 的設計,還有沒有其它的實踐作法,比如還是想支持一部分用戶或特定類型的請求,采用優先級隊列等。 就這個問題,Uber,facebook在服務化系統中還有沒有其它技術實踐?另外出現大規模服務過載后的恢復流程方面,有沒有碰到什么坑或建議?
“比如還是想支持一部分用戶或特定類型的請求” 這個其實比較難實現 因為當服務過載的時候 在acceptor thread就停止接受新的connection了,那就不知道是哪個用戶的請求 。這個需要在應用層實現,比如feature flag可以針對一些用戶關掉一些feature。 我發現有個很有用的東西就是facebook有個global kill switch,可以允許x%的流量,這個當所有service一起crash 重啟的時候比較有用。