1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
| package handler
import ( "encoding/json" "log" "time"
"github.com/gorilla/websocket" "github.com/zeromicro/go-zero/core/logx"
"github.com/nrbackback/zero-chatroom/chat/internal/types" )
type Hub struct { clients map[int]*Client
broadcast chan Msg }
type Msg struct { Msg string FromID int ToIDs []int Time int64 }
type Client struct { ID int conn *websocket.Conn send chan Msg }
var h *Hub
func InitHub() { h = &Hub{ broadcast: make(chan Msg), clients: make(map[int]*Client), } }
func RunHub() { for { select { case message := <-h.broadcast: for _, client := range h.clients { select { case client.send <- message: default: close(client.send) delete(h.clients, client.ID) } } } } }
func register(c *Client) { h.clients[c.ID] = c }
func unregister(c *Client) { delete(h.clients, c.ID) close(c.send) }
var maxMessageSize = int64(512) var pongWait = 60 * time.Second
func (c *Client) readPump() { logx.Errorw("test close writer error", logx.Field("error", "fdfsdfds")) defer func() { unregister(c) c.conn.Close() }() c.conn.SetReadLimit(maxMessageSize) c.conn.SetReadDeadline(time.Now().Add(pongWait)) c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)) return nil })
for { _, message, err := c.conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { log.Printf("error: %v", err) } break } var req types.WsRequestItem if err := json.Unmarshal(message, &req); err != nil { logx.Errorw("unmarshal message when read error", logx.Field("message", string(message)), logx.Field("error", err)) continue } h.broadcast <- Msg{ Msg: req.Message, FromID: c.ID, ToIDs: req.ToID, Time: time.Now().Unix(), } } }
var pingPeriod = (pongWait * 9) / 10 var writeWait = 10 * time.Second
func (c *Client) writePump() { ticker := time.NewTicker(pingPeriod) defer func() { ticker.Stop() c.conn.Close() }() for { select { case message, ok := <-c.send: c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if !ok { c.conn.WriteMessage(websocket.CloseMessage, []byte{}) return } w, err := c.conn.NextWriter(websocket.TextMessage) if err != nil { logx.Errorw("NextWriterd error", logx.Field("error", err)) continue } resp := types.WsResponseItem{ Message: message.Msg, FromID: message.FromID, Time: message.Time, } v, _ := json.Marshal(resp) w.Write(v) if err := w.Close(); err != nil { logx.Errorw("close writer error", logx.Field("error", err)) continue } case <-ticker.C: c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { logx.Errorw("write ping message error", logx.Field("error", err)) continue } } } }
|