大家好这里是,白泽,这期分析一下 golang 开源高性能 websocket 库 gws。
视频讲解请关注📺B站:白泽talk
High IOPS Low Latency(高I/O,低延迟)
Low Memory Usage(低内存占用)
可以从下图看到: payload 越高,性能相比其他 websocket 库越是优越,如何做到?
这是 gws 的官方聊天室 demo 的架构图,绘制在这里帮助各位理解什么是全双工的通信模式。
WebSocket 与 HTTP 一样是应用层的协议,只需要 TCP 完成三次握手之后,Golang 的 net/http 库提供了 Hijack() 方法,将 TCP 套接字(活跃的一个会话),从 HTTP 劫持,此后 tcp 的连接将由 WebSocket 管理,脱离了 HTTP 协议的范畴。
而只要获取了 TCP 的套接字,何时发送和接受数据,都是由应用层决定的,传输层的 TCP 套接字只是被编排的对象(单工/双工),自然可以实现服务端主动发送数据。
为什么 payload 越高,性能相比其他 websocket 库越是优越?
原因:gws 中的读写操作,全部使用了缓冲池。
binaryPool = internal.NewBufferPool(128, 256*1024) // 缓冲池
读缓冲:每次读取是一次系统调用,因此可以读取一段数据,且用一个 offset 定位消费的位置,减少读取次数。
写缓冲:每次写入是一次系统调用,因此可以多次写入 buffer,统一 flush。
缓冲池:为不同大小的 buffer 提供了缓冲池,大段 buffer 的创建次数减少,减少 GC 压力 & 创建对象和销毁对象时间。
// NewBufferPool Creating a memory pool
// Left, right indicate the interval range of the memory pool, they will be transformed into pow(2,n)。
// Below left, Get method will return at least left bytes; above right, Put method will not reclaim the buffer.
func NewBufferPool(left, right uint32) *BufferPool {
var begin, end = int(binaryCeil(left)), int(binaryCeil(right))
var p = &BufferPool{
begin: begin,
end: end,
shards: map[int]*sync.Pool{},
}
for i := begin; i <= end; i *= 2 {
capacity := i
p.shards[i] = &sync.Pool{
New: func() any { return bytes.NewBuffer(make([]byte, 0, capacity)) },
}
}
return p
}
使用循环从 begin
到 end
,每次容量翻倍(乘以2),为每个容量创建一个 sync.Pool
实例。sync.Pool
是Go语言标准库中的一个类型,用于存储和回收临时对象。
使用缓冲池中的 buffer
从 conn
(网络连接)中读取和写入数据时,通常会执行以下步骤:
Get
方法从缓冲池中获取一个 buffer
。conn
读取数据,可以将 buffer
用作读取操作的目的地。buffer
,然后从 buffer
写入 conn
。buffer
放回缓冲池,以便重用。编写WebSocket库时,有几个关键点会影响其性能,尤其是在高并发场景下。
下面针对这些场景,部分给出一些 demo 写法(伪代码),可以从中提炼一些通用的项目设计方法:
package main
import (
"fmt"
"time"
)
func main() {
eventChan := make(chan string)
readyChan := make(chan bool)
// 模拟WebSocket连接
go func() {
time.Sleep(2 * time.Second)
eventChan <- "connected"
readyChan <- true
}()
// 事件处理循环
for {
select {
case event := <-eventChan:
fmt.Println("Event received:", event)
case <-readyChan:
fmt.Println("WebSocket is ready to use")
return
}
}
}
并发处理: 库如何处理并发连接和消息是影响性能的重要因素。使用goroutines或线程池可以提高并发处理能力。
消息压缩: 支持消息压缩(如permessage-deflate
扩展)可以减少传输数据量,但同时也会增加CPU的使用率,需要找到合适的平衡点。
内存管理: 优化内存使用,比如通过减少内存分配和重用缓冲区,可以提高性能并减少垃圾回收的压力。
var buffer = make([]byte, 0, 1024)
func readMessage(conn *websocket.Conn) {
_, buffer, err := conn.ReadMessage()
if err != nil {
// 处理错误
}
// 使用buffer中的数据
}
type WebSocketPool struct {
pool map[*websocket.Conn]struct{}
}
func (p *WebSocketPool) Add(conn *websocket.Conn) {
p.pool[conn] = struct{}{}
}
func (p *WebSocketPool) Remove(conn *websocket.Conn) {
delete(p.pool, conn)
}
func (p *WebSocketPool) Broadcast(message []byte) {
for conn := range p.pool {
conn.WriteMessage(websocket.TextMessage, message)
}
}
import "sync"
var pool = &WebSocketPool{
pool: make(map[*websocket.Conn]struct{}),
}
var mu sync.Mutex
func broadcast(message []byte) {
mu.Lock()
defer mu.Unlock()
for conn := range pool.pool {
conn.WriteMessage(websocket.TextMessage, message)
}
}
func handleConnection(conn *websocket.Conn) {
go func() {
for {
_, message, err := conn.ReadMessage()
if err != nil {
return // 处理错误
}
// 处理接收到的消息
}
}()
}
func (c *Conn) genFrame(opcode Opcode, payload internal.Payload, isBroadcast bool) (*bytes.Buffer, error) {
if opcode == OpcodeText && !payload.CheckEncoding(c.config.CheckUtf8Enabled, uint8(opcode)) {
return nil, internal.NewError(internal.CloseUnsupportedData, ErrTextEncoding)
}
var n = payload.Len()
if n > c.config.WriteMaxPayloadSize {
return nil, internal.CloseMessageTooLarge
}
var buf = binaryPool.Get(n + frameHeaderSize)
buf.Write(framePadding[0:])
if c.pd.Enabled && opcode.isDataFrame() && n >= c.pd.Threshold {
return c.compressData(buf, opcode, payload, isBroadcast)
}
var header = frameHeader{}
headerLength, maskBytes := header.GenerateHeader(c.isServer, true, false, opcode, n)
_, _ = payload.WriteTo(buf)
var contents = buf.Bytes()
if !c.isServer {
internal.MaskXOR(contents[frameHeaderSize:], maskBytes)
}
var m = frameHeaderSize - headerLength
copy(contents[m:], header[:headerLength])
buf.Next(m)
return buf, nil
}
错误处理和恢复: 健壮的错误处理和异常恢复机制可以防止个别连接的问题影响整个服务的性能。
测试和基准: 通过广泛的测试和基准测试来识别性能瓶颈,并根据测试结果进行优化。