WebSocket Real-time: ดูข้อมูล IoT แบบสดๆ

WebSocket Real-time: ดูข้อมูล IoT แบบสดๆ

ShowkhunWorkshop

WebSocket Real-time: ดูข้อมูล IoT แบบสดๆ

Branch: step-06-websocket Phase: Development — Real-time Layer Repo: kangana1024/showkhun-workshop


น้องๆ เคยเห็น dashboard ที่ค่าอุณหภูมิอัปเดตเองแบบสดๆ โดยไม่ต้องกด refresh ไหม? นั่นแหละ WebSocket! วันนี้เราจะมาทำให้ telemetry ที่ device ส่งเข้ามา (ผ่าน REST/MQTT ที่ทำไว้สองตอนก่อน) fan-out ไปทุกหน้าจอที่กำลังดูอยู่ ทันทีที่ข้อมูลถูกเก็บลง DB มาลุยกัน (ノ◕ヮ◕)ノ*:・゚✧

เกร็ดของจริง: Hub ในโค้ดนี้ใช้แนวคิดที่แตกต่างจาก tutorial ทั่วไป — มันรันบน goroutine เดียว ที่เป็นเจ้าของ state ทั้งหมด (map ของ client/room) แปลว่าบน hot path ไม่ต้องใช้ mutex lock เลย เดี๋ยวจะอธิบายว่าทำไมมันถึงทั้งเร็วและปลอดภัยกว่า


ก่อนเริ่ม: น้องๆ จะได้อะไรจาก workshop นี้

  • เข้าใจว่าทำไม WebSocket ดีกว่า HTTP polling (WHY ก่อน HOW)
  • Hub pattern แบบ single-goroutine ที่ไม่ต้อง lock บน hot path
  • Room system ให้ client subscribe เฉพาะ device: / group: ที่สนใจ
  • Fan-out telemetry จาก ingestion path ไปยัง client ที่ subscribe
  • จัดการ heartbeat ด้วย ping/pong และ deadline
  • back-pressure: ตัด client ที่ช้าทิ้ง กันไม่ให้ลากระบบล่ม

WHY: ทำไมต้อง WebSocket ไม่ใช่ Polling?

ลองนึกว่าน้องโทรหาเพื่อนทุก 1 วินาทีเพื่อถามว่า “มีข่าวอะไรไหม?” — นั่นคือ HTTP polling มันทำงานได้ แต่สิ้นเปลืองสุดๆ:

HTTP Polling (แบบเดิม):
Client → "มีข้อมูลไหม?" → Server → "ยัง"   (ทุก 1 วิ)
Client → "มีข้อมูลไหม?" → Server → "ยัง"   (ทุก 1 วิ)
Client → "มีข้อมูลไหม?" → Server → "มีแล้ว!" (ทุก 1 วิ)

WebSocket (แบบใหม่):
Client ── เชื่อมต่อครั้งเดียว ──────────────── Server
       ←←← ข้อมูลมาถึงเมื่อไหร่ ก็ push ทันที ←←←

WebSocket เหมือน โทรหากันแล้วไม่วางสาย ทั้งสองฝั่งส่งหากันได้ตลอด ไม่ต้องต่อสายใหม่ ผลคือ latency ต่ำลง, CPU ลดลง, UX ดีขึ้นมาก

graph LR
    D[📡 Device] -->|REST/MQTT| I[SensorService.Ingest]
    I -->|เขียน| DB[(InfluxDB)]
    I -->|fanOut| H[WebSocket Hub]
    H -->|device:room| C1[🖥️ Dashboard A]
    H -->|group:room| C2[📱 Dashboard B]

Hub Pattern แบบ Single-Goroutine (อธิบายแบบภาษาคน)

ลองนึกถึง พนักงานต้อนรับคนเดียว ที่จดทุกอย่างไว้ในสมุดของตัวเองคนเดียว:

  • Hub = พนักงานต้อนรับ ที่รู้ว่าใครอยู่ห้องไหน
  • Client = แขกแต่ละคน
  • Room = ห้องประชุม (device:sensor-001, group:building-a)
  • Broadcast = ประกาศเข้าห้องที่ตรงกัน → เรียกว่า fan-out

จุดสำคัญที่ออกแบบไว้ฉลาดคือ — มีพนักงานคนเดียวเป็นเจ้าของสมุด ใครจะแก้ข้อมูล (เพิ่ม client, เข้าห้อง, broadcast) ต้อง “ส่งโน้ตให้พนักงานคนนี้” ผ่าน channel แล้วเขาทำให้ทีละอย่าง — ไม่มีใครแย่งกันเขียนสมุดพร้อมกัน เลยไม่ต้องล็อกอะไรเลย

มาดู struct ของ Hub จาก backend/internal/ws/hub.go:

// Hub เป็นเจ้าของ set ของ client และ index ของ room membership
// การแก้ไขทั้งหมดเกิดบน goroutine ของ run loop ผ่าน channel
// register/unregister/broadcast
type Hub struct {
	log *zap.Logger

	// rooms map ชื่อห้อง → set ของ client ที่ subscribe ห้องนั้น
	rooms map[string]map[*Client]struct{}
	// clients คือ set ของ client ที่ต่ออยู่ทั้งหมด
	clients map[*Client]struct{}

	register    chan *Client
	unregister  chan *Client
	broadcast   chan envelope
	subscribe   chan subscription
	unsubscribe chan subscription
	query       chan roomQuery
	done        chan struct{}
}

หัวใจคือ event loop ที่ select รับงานจาก channel ทีละอย่าง:

// Run คือ event loop ของ hub return เมื่อ Stop ถูกเรียก
func (h *Hub) Run() {
	for {
		select {
		case c := <-h.register:
			h.clients[c] = struct{}{}

		case c := <-h.unregister:
			h.removeClient(c)

		case s := <-h.subscribe:
			room := h.rooms[s.room]
			if room == nil {
				room = make(map[*Client]struct{})
				h.rooms[s.room] = room
			}
			room[s.client] = struct{}{}
			s.client.rooms[s.room] = struct{}{}

		case s := <-h.unsubscribe:
			h.leaveRoom(s.client, s.room)

		case env := <-h.broadcast:
			for c := range h.rooms[env.room] {
				h.deliver(c, env.data)
			}

		case <-h.done:
			for c := range h.clients {
				close(c.send)
			}
			return
		}
	}
}

WHY single-goroutine? เพราะทุกการอ่าน/เขียน map เกิดบน goroutine เดียวกัน เราเลย ไม่ต้องใช้ mutex บน hot path เลย — ไม่มี lock contention, ไม่มี deadlock, ไม่มี race ทุก mutation ถูกทำให้เป็นคิวอัตโนมัติด้วย channel นี่คือสไตล์ Go แท้ๆ: “share memory by communicating”


Back-pressure: ตัด Client ช้าทิ้ง

ปัญหาคลาสสิกของ fan-out คือ — ถ้ามี client เน็ตอืดคนหนึ่งรับไม่ทัน มันจะถ่วงให้ทุกคน “รอ” ด้วยกัน Hub นี้แก้ด้วย deliver ที่ใช้ non-blocking send:

// deliver ดัน data เข้า send buffer ของ client client ที่ buffer เต็ม
// ถือว่า "ช้าเกินไป" แล้วตัดทิ้ง เพื่อปกป้อง hub — back-pressure จาก
// client ช้าตัวเดียวต้องไม่ทำให้ fan-out ไปหาทุกคนค้าง
func (h *Hub) deliver(c *Client, data []byte) {
	select {
	case c.send <- data:
	default:
		h.log.Warn("dropping slow websocket client", zap.String("remote", c.remote))
		h.removeClient(c)
		close(c.send)
	}
}

อุปมา: เหมือนสายพานอาหารใน buffet ถ้ามีคนหยิบช้ามากจนของกองเต็มหน้าเขา พนักงานจะ “ข้าม” คนนั้นไป ไม่หยุดสายพานทั้งเส้นเพื่อรอเขาคนเดียว — คนอื่นต้องได้กินต่อ


Broadcast: Serialize ครั้งเดียว ส่งทุกคน

เวลา broadcast เรา marshal JSON ครั้งเดียว แล้วส่ง bytes ชุดเดิมให้ทุก subscriber — ไม่ต้อง marshal ซ้ำต่อคน ประหยัด CPU มาก:

// Broadcast serialise msg ครั้งเดียวแล้วส่งให้ทุก client ใน msg.Room
// ถ้า buffer เต็มจะ drop message ทิ้ง ไม่ block caller (เช่น ingestion path)
func (h *Hub) Broadcast(msgType, room string, payload any) {
	data, err := json.Marshal(Message{Type: msgType, Room: room, Payload: payload})
	if err != nil {
		h.log.Error("marshal broadcast message", zap.Error(err))
		return
	}
	select {
	case h.broadcast <- envelope{room: room, data: data}:
	case <-h.done:
	default:
		h.log.Warn("broadcast buffer full; dropping message", zap.String("room", room))
	}
}

Message คือ envelope ที่ client จะได้รับ — รูปร่างของจริงเรียบมาก:

// Message คือ envelope ที่ broadcast ไปหา client
// Type บอกชนิด payload (เช่น "sensor_data"); Room บอกห้องต้นทาง
type Message struct {
	Type    string `json:"type"`
	Room    string `json:"room"`
	Payload any    `json:"payload"`
}

สำคัญ: Broadcast ใช้ default: ใน select แปลว่า ถ้า broadcast buffer เต็ม มัน “ทิ้ง message แล้วไปต่อ” ไม่ block — เพราะตัวที่เรียก Broadcast คือ ingestion path การเก็บข้อมูลลง DB ต้องไม่สะดุดเพราะ WebSocket ช้า (live view เป็น best-effort เสมอ)


Client & Room: ใครเข้าห้องไหนได้บ้าง

แต่ละ connection คือ Client หนึ่งตัว มี send buffer ของตัวเอง และ set ของ room ที่ subscribe โค้ดจาก backend/internal/ws/client.go:

// Client แทน WebSocket connection หนึ่งเส้น และ room ที่มัน subscribe
type Client struct {
	hub    *Hub
	conn   *websocket.Conn
	log    *zap.Logger
	remote string

	// send คือ outbound buffer ต่อ client write pump คอย drain มัน
	send chan []byte
	// rooms คือ set ของ room ที่ client นี้ subscribe ถูกแก้บน hub goroutine
	// เท่านั้น จึงไม่ต้อง lock
	rooms map[string]struct{}
}

room ที่ client เข้าได้ถูกจำกัดด้วย prefix เพื่อกันไม่ให้ client เข้าห้องภายในมั่วๆ:

const (
	roomDevicePrefix = "device:"
	roomGroupPrefix  = "group:"
)

// validRoom บอกว่า room นี้ client มีสิทธิ์เข้าไหม
func validRoom(room string) bool {
	if len(room) <= len(roomDevicePrefix) {
		return false
	}
	return hasPrefix(room, roomDevicePrefix) || hasPrefix(room, roomGroupPrefix)
}

// DeviceRoom คืนชื่อห้องของ live data ของ device ตัวหนึ่ง
func DeviceRoom(deviceID string) string { return roomDevicePrefix + deviceID }

// GroupRoom คืนชื่อห้องของ live data ของ device group
func GroupRoom(groupID string) string { return roomGroupPrefix + groupID }

client จะส่งคำสั่งเข้า/ออกห้องผ่าน control message รูปแบบนี้:

// clientCommand คือ control message ที่ client ส่งมาจัดการ subscription
type clientCommand struct {
	Action string `json:"action"` // "subscribe" | "unsubscribe"
	Room   string `json:"room"`   // เช่น "device:sensor-001" หรือ "group:<id>"
}

ReadPump & WritePump + Heartbeat

แต่ละ client มี 2 goroutine: ReadPump (อ่านคำสั่ง subscribe + ตอบ pong) และ WritePump (ดันข้อมูลออก + ส่ง ping เป็นจังหวะ)

     ┌──────────────────────────────────────────┐
     │              Client                      │
     │  ReadPump  ──── subscribe/unsubscribe ──► Hub
     │  WritePump ◄──── send channel ◄───────── Hub
     │  ping ticker ───────────────────────────► (ทุก 54 วิ)
     └──────────────────────────────────────────┘

ค่า timing ของจริง:

const (
	writeWait      = 10 * time.Second
	pongWait       = 60 * time.Second
	pingPeriod     = (pongWait * 9) / 10 // = 54 วิ ต้องน้อยกว่า pongWait
	maxMessageSize = 4 * 1024            // control message ตัวเล็กนิดเดียว
	sendBuffer     = 64
)

WritePump ส่ง ping เป็นจังหวะ ถ้า client ยังไม่ตอบ pong ภายใน pongWait deadline จะหมดแล้ว connection ถูกปิด:

// WritePump drain send buffer ออก connection และส่ง ping เป็นระยะ
func (c *Client) WritePump() {
	ticker := time.NewTicker(pingPeriod)
	defer func() {
		ticker.Stop()
		_ = c.conn.Close()
	}()

	for {
		select {
		case data, ok := <-c.send:
			_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
			if !ok {
				// hub ปิด channel แล้ว ส่ง close frame สวยๆ
				_ = c.conn.WriteMessage(websocket.CloseMessage,
					websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
				return
			}
			if err := c.conn.WriteMessage(websocket.TextMessage, data); err != nil {
				return
			}

		case <-ticker.C:
			_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
			if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
				return
			}
		}
	}
}

ส่วน ReadPump ตั้ง read deadline แล้วรีเซ็ตทุกครั้งที่ได้ pong กลับมา:

func (c *Client) ReadPump() {
	defer func() {
		c.hub.Unregister(c)
		_ = c.conn.Close()
	}()

	c.conn.SetReadLimit(maxMessageSize)
	_ = c.conn.SetReadDeadline(time.Now().Add(pongWait))
	c.conn.SetPongHandler(func(string) error {
		return c.conn.SetReadDeadline(time.Now().Add(pongWait)) // ได้ pong → ต่ออายุ
	})

	for {
		_, data, err := c.conn.ReadMessage()
		if err != nil {
			return
		}
		var cmd clientCommand
		if err := json.Unmarshal(data, &cmd); err != nil {
			c.sendError("invalid command: not valid JSON")
			continue
		}
		c.handleCommand(cmd)
	}
}

Tip จากพี่โชว์: pingPeriod = (pongWait * 9) / 10 คือ 54 วินาที เราส่ง ping ก่อน deadline 60 วิ จะหมดเสมอ เหมือนเช็คชื่อในห้องเรียน ถ้าไม่ตอบ pong ก็โดนลบออกจากระบบ — กัน connection ผีที่ค้างกินทรัพยากร


WebSocket Handler: ประตู Upgrade

ตอนนี้ใช้ github.com/gofiber/contrib/websocket (v1.3.4) ไม่ใช่ gofiber/websocket/v2 ตัวเก่า Handler ทำหน้าที่ guard upgrade แล้วผูก connection เข้า hub จาก backend/internal/handler/ws.go:

import (
	"github.com/gofiber/contrib/websocket"
	"github.com/gofiber/fiber/v2"
)

// Register mount route ของ WebSocket guard upgrade รันก่อน
// แล้ว reject non-WebSocket request ด้วย 426 กันไม่ให้ใช้ route นี้แบบ HTTP ธรรมดา
func (h *WSHandler) Register(r fiber.Router) {
	r.Use("/ws", func(c *fiber.Ctx) error {
		if websocket.IsWebSocketUpgrade(c) {
			return c.Next()
		}
		return fiber.ErrUpgradeRequired
	})
	r.Get("/ws", websocket.New(h.serve))
}

// serve คือ handler ต่อ connection contrib middleware รันมันใน goroutine ของมันเอง
// เราเปิด write pump อีก goroutine แล้ว block บน read pump เพื่อให้ connection
// (และ goroutine ทั้งคู่) ถูกเก็บกวาดพร้อมกัน
func (h *WSHandler) serve(conn *websocket.Conn) {
	client := ws.NewClient(h.hub, conn, h.log)
	h.hub.Register(client)

	go client.WritePump()
	client.ReadPump() // block จน connection ปิด
}

route จริงถูก mount ใต้ /api/v1 กลายเป็น /api/v1/ws


เชื่อม Ingestion เข้ากับ Hub (Fan-out)

นี่คือส่วนที่ “มัด” ทุกอย่างเข้าด้วยกัน sensor service ไม่รู้จัก WebSocket ตรงๆ แต่รู้จัก interface เล็กๆ ชื่อ Broadcaster จาก backend/internal/service/sensor.go:

// Broadcaster fan reading ที่ ingest สำเร็จออกไปหา subscriber สดๆ
// เป็น seam ไปยัง WebSocket hub และ optional (เป็น nil ได้)
// service จึงทำงานได้ปกติแม้ไม่ wire WebSocket
type Broadcaster interface {
	Broadcast(msgType, room string, payload any)
}

// LiveReading คือ payload ที่ broadcast ไปหา subscriber หลัง reading ถูกเก็บ
type LiveReading struct {
	DeviceID  string             `json:"device_id"`
	GroupID   string             `json:"group_id,omitempty"`
	Fields    map[string]float64 `json:"fields"`
	Tags      map[string]string  `json:"tags,omitempty"`
	Timestamp time.Time          `json:"timestamp"`
}

หลังเขียน InfluxDB สำเร็จ Ingest เรียก fanOut:

// fanOut broadcast reading ที่เก็บแล้ว ไปยัง device room
// (และ group room ถ้า device อยู่ใน group) no-op ถ้าไม่มี broadcaster
func (s *SensorService) fanOut(live LiveReading) {
	if s.broadcaster == nil {
		return
	}
	s.broadcaster.Broadcast("sensor_data", "device:"+live.DeviceID, live)
	if live.GroupID != "" {
		s.broadcaster.Broadcast("sensor_data", "group:"+live.GroupID, live)
	}
}

จุดเด็ด: Broadcaster เป็น interface ที่ *ws.Hub “บังเอิญ” implement ครบ (มี method Broadcast ตรง signature) เลยเสียบเข้ากันได้โดย service ไม่ต้อง import package ws เลย — แยก dependency สวยมาก และเพราะมันเป็น nil ได้ การรันเทส ingestion โดยไม่มี WebSocket ก็ทำได้สบาย

การประกอบร่างใน backend/cmd/server/main.go:

// WebSocket hub fan live sensor data ออกไปหา subscriber
// รันบน goroutine ของตัวเอง แล้ว wire เข้า ingestion service เป็น broadcaster
hub := ws.NewHub(log)
go hub.Run()
sensorService := service.NewSensorService(deviceRepo, influx, hub, cfg.Influx.Measurement)

เห็นไหมครับ — hub ตัวเดียวเสียบเป็น argument ที่ 3 ของ NewSensorService จบ ทั้ง REST และ MQTT telemetry ที่วิ่งผ่าน Ingest จะ fan-out อัตโนมัติ


ทดสอบ WebSocket กัน!

เชื่อมต่อด้วย websocat

brew install websocat   # หรือ cargo install websocat

websocat "ws://localhost:3000/api/v1/ws"

Subscribe เข้าห้อง

ส่ง control message (รูปแบบ clientCommand):

{ "action": "subscribe", "room": "device:sensor-temp-001" }

แล้วลองยิง telemetry เข้า device ตัวนั้น (ผ่าน REST หรือ MQTT จากสองตอนก่อน) — น้องจะเห็น message แบบนี้ push เข้ามาทันที:

{
  "type": "sensor_data",
  "room": "device:sensor-temp-001",
  "payload": {
    "device_id": "sensor-temp-001",
    "fields": { "temperature": 28.7, "humidity": 64.3 },
    "tags": { "location": "room-a" },
    "timestamp": "2026-03-26T10:00:05Z"
  }
}

ถ้าส่ง room ที่ไม่ขึ้นต้นด้วย device: หรือ group: จะได้ error กลับ:

{ "type": "error", "room": "", "payload": "invalid room: must start with device: or group:" }

ออกจากห้องก็แค่:

{ "action": "unsubscribe", "room": "device:sensor-temp-001" }

ตัวอย่าง TypeScript Client (Admin Panel)

สำหรับ admin panel เราเขียน client class ที่ auto-reconnect ให้ใช้งานได้จริง โครงตรงกับ wire format ด้านบน:

// lib/ws.ts
export type WSMessage = {
  type: string;
  room: string;
  payload: unknown;
};

export class IoTSocket {
  private ws: WebSocket | null = null;
  private reconnectDelay = 1000;
  private readonly maxReconnectDelay = 30000;
  private listeners = new Map<string, Set<(msg: WSMessage) => void>>();

  constructor(private url: string) {}

  connect(): void {
    this.ws = new WebSocket(this.url);

    this.ws.onopen = () => {
      this.reconnectDelay = 1000; // reset backoff เมื่อต่อสำเร็จ
    };

    this.ws.onmessage = (event) => {
      const msg: WSMessage = JSON.parse(event.data);
      this.listeners.get(msg.type)?.forEach((fn) => fn(msg));
    };

    this.ws.onclose = () => {
      // exponential backoff: 1s → 2s → 4s ... สูงสุด 30s
      setTimeout(() => this.connect(), this.reconnectDelay);
      this.reconnectDelay = Math.min(this.reconnectDelay * 2, this.maxReconnectDelay);
    };
  }

  // ส่ง control message ตามรูป { action, room }
  subscribe(room: string): void {
    this.send({ action: 'subscribe', room });
  }

  unsubscribe(room: string): void {
    this.send({ action: 'unsubscribe', room });
  }

  on(type: string, handler: (msg: WSMessage) => void): () => void {
    if (!this.listeners.has(type)) this.listeners.set(type, new Set());
    this.listeners.get(type)!.add(handler);
    return () => this.listeners.get(type)?.delete(handler);
  }

  private send(obj: unknown): void {
    if (this.ws?.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify(obj));
    }
  }
}

// การใช้งาน:
// const sock = new IoTSocket('ws://localhost:3000/api/v1/ws');
// sock.connect();
// sock.subscribe('device:sensor-temp-001');
// const off = sock.on('sensor_data', (m) => {
//   const p = m.payload as { fields: { temperature: number } };
//   setTemp(p.fields.temperature);
// });

สังเกตไหม? reconnect ใช้ exponential backoff เริ่ม 1 วิ แล้ว 2, 4, 8 … สูงสุด 30 วิ เพื่อไม่ให้ client ไป hammer server ตอนมันล่ม — ฝั่ง server เองก็ไม่ต้องส่ง ping/pong เพราะ browser WebSocket จัดการ control frame ให้อัตโนมัติ


สรุป: วันนี้เราทำอะไรไปบ้าง

(づ。◕‿‿◕。)づ ยินดีด้วย! น้องๆ ผ่าน real-time layer มาแล้ว!
ส่วนประกอบ รายละเอียด
Library github.com/gofiber/contrib/websocket v1.3.4
Route GET /api/v1/ws (guard ด้วย 426 ถ้าไม่ใช่ upgrade)
Hub single-goroutine เป็นเจ้าของ state — ไม่ lock บน hot path
Message envelope { type, room, payload } (fan-out type = "sensor_data")
Client command { action, room } — action: subscribe/unsubscribe
Rooms device:<id> และ group:<id> (validate prefix)
Fan-out จาก SensorService.fanOut → device room + group room
Back-pressure client ที่ buffer เต็มถูกตัดทิ้ง ไม่ถ่วงคนอื่น
Heartbeat ping ทุก 54 วิ, pongWait 60 วิ

หัวใจของตอนนี้คือ single-goroutine ownership — แทนที่จะกระจาย mutex ไปทั่ว เราให้ goroutine เดียวเป็นเจ้าของ state ทั้งหมด สื่อสารกันผ่าน channel ผลคือโค้ดที่ทั้งเร็ว ปลอดภัยจาก race และอ่านง่ายกว่าเยอะ


Backend ครบชุดแล้ว!

ผ่านมาถึงตรงนี้ data layer หลักของ backend ครบแล้ว:

  1. Sensor Ingestion — REST รับ + เขียน InfluxDB แบบ blocking
  2. MQTT — device คุยผ่าน broker ป้อนเข้า ingestion path เดิม
  3. WebSocket — fan-out telemetry สดๆ ไปทุก dashboard

Next Step

ขั้นต่อไปเราจะไปตั้ง TICK Stack / time-series storage ให้แน่นขึ้น (retention, downsampling) ก่อนต่อยอดไป Telegraf pipeline และ alerting engine ในตอนหลังๆ