รับ Sensor Data เข้า InfluxDB แบบไม่กลัวตาย
รับ Sensor Data เข้า InfluxDB แบบไม่กลัวตาย
Branch:
step-04-sensor-ingestionPhase: Development — Data Pipeline Repo: kangana1024/showkhun-workshop
เคยสังเกตไหมครับว่าเวลา sensor ส่งข้อมูลมา บางทีมันถล่มมาพร้อมกันเป็นร้อยตัว ถ้าเรารับแบบไม่มีระบบ มันก็เหมือนเปิดประตูบ้านทิ้งไว้แล้วปล่อยใครก็ไม่รู้เดินเข้ามาได้เลย วันนี้เราจะมาสร้าง Sensor Ingestion ที่รับข้อมูลได้เร็ว เช็คก่อนรับ และถ้า DB ล่ม คนส่งจะ “รู้ทันที” ไม่ใช่เงียบหายไป (ʘ‿ʘ)
เกร็ดจากของจริง: ในโค้ด branch นี้เราเลือกใช้ InfluxDB WriteAPIBlocking ไม่ใช่ buffered WriteAPI แบบ fire-and-forget — เพราะอยากให้ write error เด้งกลับมาที่ HTTP response แบบ synchronous ไม่ใช่หายไปใน goroutine เงียบๆ เดี๋ยวจะอธิบายว่าทำไมตัดสินใจแบบนี้
ก่อนเริ่ม: น้องๆ จะได้อะไรจาก workshop นี้
- เข้าใจว่าทำไม ingestion ต้องแยกเป็น handler → service → influx (WHY ก่อน HOW)
- สร้าง REST endpoint รับ sensor data ทั้ง single (
/sensors/data) และ batch (/sensors/data/batch) - validate payload ด้วย go-playground/validator v10 + custom rule
alphanumdash - เช็คทุก reading กับ device registry ก่อนเขียน (unknown/disabled = reject)
- เขียนลง InfluxDB 2.x แบบ blocking ที่ error เด้งกลับ HTTP จริง
- กัน device ซน ด้วย rate limiter แบบ token-bucket ต่อ device
WHY: ทำไมต้องแยก Layer ไม่ยัดรวมกัน?
ลองนึกภาพโรงคัดแยกพัสดุ — พัสดุที่เข้ามาไม่ได้โยนเข้าเครื่องตรงๆ ต้องผ่านสายพาน เช็คที่อยู่ → ชั่งน้ำหนัก → แปะ label → จัดเข้าช่อง ถ้าข้ามขั้นแล้วยัดเข้าเครื่องเลย เครื่องพัง
Sensor data ก็เหมือนกัน เราแบ่งความรับผิดชอบเป็น 3 ชั้นชัดๆ:
graph LR
A[📡 REST Client] -->|POST JSON| B[SensorHandler]
B -->|parse + validate + rate limit| C[SensorService]
C -->|เช็ค device registry| D[(MongoDB)]
C -->|เขียน point| E[(InfluxDB 2.x)]
- Handler = พนักงานต้อนรับ: parse JSON, validate, เช็ค rate limit แล้วส่งต่อ ไม่มี business logic
- Service = สมอง: เช็ค device จาก registry, สร้าง InfluxDB point, สั่งเขียน
- Influx wrapper = แขนที่ยื่นไปแตะ DB: เขียน point พร้อม timeout
ข้อดีคือถ้า spec ของ InfluxDB เปลี่ยน เราแก้แค่ชั้น service/influx ไม่ต้องไปยุ่งกับ handler เลย
Domain Model — กำหนด “รูปร่าง” ของข้อมูลก่อน
ก่อนเขียน logic เราต้องรู้ก่อนว่า reading หน้าตาเป็นยังไง โค้ดจริงอยู่ที่ backend/internal/model/sensor.go:
// SensorReading คือ measurement หนึ่งจุดที่ device รายงานมา
// เขียนลง InfluxDB (ไม่ใช่ Mongo) จึงไม่มี bson tag
type SensorReading struct {
DeviceID string `json:"device_id" validate:"required,min=3,max=128,alphanumdash"`
Fields map[string]float64 `json:"fields" validate:"required,min=1,max=64,dive,keys,required,min=1,max=64,endkeys"`
Tags map[string]string `json:"tags" validate:"omitempty,max=16,dive,keys,required,min=1,max=64,endkeys,max=128"`
Timestamp *time.Time `json:"timestamp" validate:"omitempty"`
}
// SensorBatchRequest คือ payload ของ endpoint แบบ batch
type SensorBatchRequest struct {
Readings []SensorReading `json:"readings" validate:"required,min=1,dive"`
}
จุดที่ออกแบบไว้ดีมาก:
Fieldsเป็นmap[string]float64— ตัวเลขล้วน ไม่ใช่interface{}มั่วๆ ค่าวัด (temperature, humidity) ต้องเป็นตัวเลขเสมอTagsเป็นmap[string]string— label ที่ cardinality ต่ำ ไว้ filter ใน InfluxDB
อุปมา fields vs tags:
Fieldsคือ ตัวเลขบนสเกล (น้ำหนัก, อุณหภูมิ) ส่วนTagsคือ ป้ายติดข้างกล่อง (ห้องไหน, ชั้นไหน) ถ้าเอาตัวเลขที่ไม่ซ้ำเลย (เช่น timestamp) ไปยัดเป็น tag จะเจอกับดัก high-cardinality ที่ทำให้ time-series ช้าลงมหาศาล
Timestampเป็น pointer (*time.Time) — ถ้าnilแปลว่า device ไม่ได้ส่งเวลามา เราใช้เวลา server แทน ไม่บังคับ device ทุกตัวต้องมีนาฬิกาตรงเป๊ะ
สังเกต alphanumdash ด้วย — นั่นคือ custom validation rule ที่เราลงทะเบียนเอง อนุญาตแค่ [A-Za-z0-9_-] ใน device id เพื่อกันอักขระแปลกๆ หลุดไปเป็น tag
Sensor Handler — ประตูรับข้อมูล (ต้องบาง)
Handler ที่ดีต้องบาง: รับ → validate → rate limit → ส่งต่อ จบ โค้ดจาก backend/internal/handler/sensor.go:
// Ingest จัดการ POST /sensors/data สำหรับ reading เดียว
func (h *SensorHandler) Ingest(c *fiber.Ctx) error {
var r model.SensorReading
if err := c.BodyParser(&r); err != nil {
return httpx.Error(c, fiber.StatusBadRequest, "invalid JSON body")
}
if err := validate.Struct(r); err != nil {
return writeValidationError(c, err)
}
// กันต่อ device ก่อนทำงานหนัก
if !h.limiter.Allow(r.DeviceID) {
return rateLimited(c)
}
if err := h.svc.Ingest(c.UserContext(), r); err != nil {
return mapSensorError(c, err)
}
return c.Status(fiber.StatusAccepted).JSON(fiber.Map{
"data": fiber.Map{"accepted": 1},
})
}
เรียบง่ายดีไหมครับ — parse, validate, เช็ค limiter, delegate, return 202 Accepted ไม่มี business logic ซ่อนอยู่เลย
จุดสำคัญที่หลายคนพลาดคือ error mapping — service โยน error เป็น sentinel (ErrNotFound, ErrDeviceDisabled) แล้ว handler แปลเป็น HTTP status ที่ถูกต้อง:
// mapSensorError แปลง ingestion error เป็น HTTP response
func mapSensorError(c *fiber.Ctx, err error) error {
switch {
case errors.Is(err, service.ErrNotFound):
return httpx.Error(c, fiber.StatusNotFound, "device is not registered")
case errors.Is(err, service.ErrDeviceDisabled):
return httpx.Error(c, fiber.StatusForbidden, "device is disabled")
default:
return httpx.Error(c, fiber.StatusBadGateway, "failed to persist sensor data")
}
}
เห็นบรรทัด default ไหม? ถ้าเขียน InfluxDB ไม่สำเร็จ จะตอบ 502 Bad Gateway — เพราะปัญหาอยู่ที่ระบบหลังบ้าน (DB) ไม่ใช่ผู้ส่ง นี่คือเหตุผลที่เราใช้ blocking write: เพื่อให้ error เด้งกลับมาถึงตรงนี้ได้
Batch Ingestion — เร็ว แต่กันการแอบยัด
endpoint แบบ batch รับได้สูงสุด 500 readings (กำหนดด้วย maxBatchSize) จุดที่น่าสนใจคือ การเก็บ rate limit ของ batch:
// IngestBatch จัดการ POST /sensors/data/batch
func (h *SensorHandler) IngestBatch(c *fiber.Ctx) error {
var req model.SensorBatchRequest
if err := c.BodyParser(&req); err != nil {
return httpx.Error(c, fiber.StatusBadRequest, "invalid JSON body")
}
if len(req.Readings) > h.maxBatchSize {
return httpx.Error(c, fiber.StatusRequestEntityTooLarge, "batch exceeds the maximum size")
}
if err := validate.Struct(req); err != nil {
return writeValidationError(c, err)
}
// นับ reading ต่อ device แล้วชาร์จ bucket ของแต่ละ device ทีละ reading
// ถ้า device ไหนเกิน limit "ทั้ง batch ถูก reject" — กันการแอบยัด
// reading เกินโควต้าเข้ามาในก้อน batch
perDevice := make(map[string]int, len(req.Readings))
for _, r := range req.Readings {
perDevice[r.DeviceID]++
}
for deviceID, n := range perDevice {
for i := 0; i < n; i++ {
if !h.limiter.Allow(deviceID) {
return rateLimited(c)
}
}
}
accepted, err := h.svc.IngestBatch(c.UserContext(), req.Readings)
if err != nil {
return mapSensorError(c, err)
}
return c.Status(fiber.StatusAccepted).JSON(fiber.Map{
"data": fiber.Map{"accepted": accepted},
})
}
ทำไมต้องเก็บ token ต่อ reading ในก้อน batch? เพราะถ้าเก็บแค่ครั้งเดียวต่อ request device ก็จะแอบยัด 500 readings เข้ามาในหนึ่ง batch เพื่อหนี rate limit ได้ — เราเลยชาร์จ bucket ของแต่ละ device เท่ากับจำนวน reading ที่มันส่งมาจริง
Sensor Service — สมองที่เช็คก่อนเขียน
นี่คือหัวใจ โค้ดจาก backend/internal/service/sensor.go — สังเกตว่า service ไม่ผูกกับ database package ตรงๆ แต่ผูกกับ interface เล็กๆ แทน เพื่อให้ mock ตอนเทสได้ง่าย:
// PointWriter คือ subset ของ Influx wrapper ที่ ingestion ต้องใช้
// แค่นี้ก็พอ ไม่ต้องลาก database package ทั้งก้อนเข้ามา
type PointWriter interface {
WritePoints(ctx context.Context, points ...*write.Point) error
}
// deviceLookup คือ slice อ่านอย่างเดียวของ device repo ไว้เช็ค registry
type deviceLookup interface {
GetByDeviceID(ctx context.Context, deviceID string) (*model.Device, error)
}
เทคนิค design: “consumer-defined interface” — ฝั่งที่ใช้งาน (service) เป็นคนนิยาม interface เอง ว่าตัวเองต้องการ method อะไรบ้าง ฝั่ง Influx/Mongo แค่ “บังเอิญ” มี method ตรง ก็เสียบใช้ได้เลย แยกส่วนกันสวยมาก
ตัว Ingest เรียก buildPoint ที่ทำงานหนักจริง:
// buildPoint เช็ค reading กับ registry แล้วแปลงเป็น InfluxDB point
// ที่เติม tag จาก registry ให้ (type, group)
func (s *SensorService) buildPoint(ctx context.Context, r model.SensorReading) (*write.Point, error) {
dev, err := s.devices.GetByDeviceID(ctx, r.DeviceID)
if err != nil {
if errors.Is(err, repository.ErrNotFound) {
return nil, ErrNotFound // device ไม่มีในระบบ → 404
}
return nil, err
}
if !dev.Enabled {
return nil, ErrDeviceDisabled // device ถูกปิด → 403
}
ts := time.Now().UTC()
if r.Timestamp != nil {
ts = r.Timestamp.UTC()
}
pt := write.NewPointWithMeasurement(s.measurement)
// tag ที่ดึงจาก registry — cardinality ต่ำ
pt.AddTag("device_id", dev.DeviceID)
pt.AddTag("device_type", string(dev.Type))
if dev.GroupID != nil {
pt.AddTag("group_id", dev.GroupID.Hex())
}
// tag ที่ผู้ส่งให้มา (ผ่าน validate เรื่องขนาด/รูปแบบแล้ว)
for k, v := range r.Tags {
pt.AddTag(k, v)
}
// numeric fields
for k, v := range r.Fields {
pt.AddField(k, v)
}
pt.SetTime(ts)
return pt, nil
}
ทำไมต้องเติม tag จาก registry เอง? เพราะถ้าให้ device รายงาน device_type หรือ group_id มาเอง มันโกหกได้ — แต่ถ้าเราดึงจาก registry (MongoDB) แทน ข้อมูลนี้จะเชื่อถือได้เสมอ และ query ทีหลังก็ filter ตาม type/group ได้โดยไม่ต้อง join กลับไป Mongo
ส่วน IngestBatch validate ทุก reading ก่อน แล้วค่อยเขียนทีเดียว — ถ้ามีตัวไหน reference device ที่ไม่มีจริงหรือถูกปิด ทั้ง batch ถูก reject แบบ atomic:
// IngestBatch validate ทุก reading กับ registry ก่อน "แล้วค่อยเขียน"
// ถ้ามี device ที่ไม่รู้จัก/ถูกปิดอยู่ในก้อน ทั้ง batch ถูก reject ทั้งหมด
func (s *SensorService) IngestBatch(ctx context.Context, readings []model.SensorReading) (int, error) {
points := make([]*write.Point, 0, len(readings))
for i := range readings {
pt, err := s.buildPoint(ctx, readings[i])
if err != nil {
return 0, fmt.Errorf("reading %d (%s): %w", i, readings[i].DeviceID, err)
}
points = append(points, pt)
}
if err := s.writer.WritePoints(ctx, points...); err != nil {
return 0, err
}
return len(points), nil
}
InfluxDB Wrapper — ทำไมเลือก Blocking Write?
นี่คือการตัดสินใจ design ที่สำคัญที่สุดของตอนนี้ โค้ดจาก backend/internal/database/influx.go:
// Influx ห่อ client ของ InfluxDB ด้วย blocking write API ที่ผูกกับ org/bucket
// ใช้ WriteAPIBlocking (ไม่ใช่ buffered WriteAPI แบบ fire-and-forget)
// เพื่อให้ write error เด้งกลับมาที่ caller แบบ synchronous และรายงานใน HTTP ได้
type Influx struct {
client influxdb2.Client
writeAPI api.WriteAPIBlocking
timeout time.Duration
}
// WritePoints เขียน point แบบ synchronous
func (i *Influx) WritePoints(ctx context.Context, points ...*write.Point) error {
if len(points) == 0 {
return nil
}
if _, ok := ctx.Deadline(); !ok {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, i.timeout)
defer cancel()
}
if err := i.writeAPI.WritePoint(ctx, points...); err != nil {
return fmt.Errorf("write influx points: %w", err)
}
return nil
}
WHY blocking? InfluxDB SDK มี 2 โหมด:
- WriteAPI (buffered, async): เร็วมาก ส่งเข้า buffer แล้วลืมได้เลย — แต่ถ้า write fail คนส่งไม่มีทางรู้ มันหายไปใน goroutine เงียบๆ
- WriteAPIBlocking (sync): ช้ากว่านิดหน่อย แต่ถ้า write fail เรา return error ได้ ทำให้ตอบ client เป็น
502ได้ทันทีสำหรับ workshop นี้เราเลือก blocking เพราะ “ความถูกต้องของ contract กับ client” สำคัญกว่า throughput สุดขีด — device จะได้รู้ว่าข้อมูลเข้าจริงไหม ไม่ใช่เดาเอา
อีกจุดที่ละเอียดคือ ถ้า ctx ยังไม่มี deadline เราจะใส่ timeout ให้เอง (SetHTTPRequestTimeout) เพื่อกัน write ค้างไปนานจนพิน goroutine
Rate Limiting — Token Bucket ต่อ Device
ลองคิดว่า device มี bug ส่งข้อมูลวนลูปไม่หยุด — ถ้าไม่กัน server ล่มได้เลย เหมือนท่อประปาไม่มีวาล์ว เปิดสุดแล้วน้ำท่วม
เราเขียน limiter เองแบบ token-bucket ไม่ต้องพึ่ง dependency ภายนอก โค้ดจาก backend/internal/ratelimit/ratelimit.go:
// Allow บอกว่า request ของ key นี้ผ่านได้ไหม ถ้าผ่านก็กิน token 1 ตัว
func (l *Limiter) Allow(key string) bool {
now := l.now()
l.mu.Lock()
defer l.mu.Unlock()
b, ok := l.buckets[key]
if !ok {
// key ใหม่เริ่มด้วย bucket เต็ม ลบ token ที่กำลังกินไป 1 ตัว
l.buckets[key] = &bucket{tokens: l.capacity - 1, lastSeen: now}
l.maybeCleanup(now)
return true
}
// เติม token ตามเวลาที่ผ่านไป แต่ไม่เกิน capacity
elapsed := now.Sub(b.lastSeen).Seconds()
b.tokens = min(l.capacity, b.tokens+elapsed*l.refillPerSec)
b.lastSeen = now
if b.tokens < 1 {
return false
}
b.tokens--
return true
}
อุปมา token-bucket: นึกถึง ถังน้ำที่มีก๊อกหยดเติมตลอดเวลา ทุก request ต้องตักน้ำ 1 แก้ว ถ้าถังว่าง (ตักเร็วเกินก๊อกเติม) ก็ต้องรอ — แต่ถ้าเว้นช่วงไม่ตักนาน ถังก็เต็มขึ้นมาเอง รองรับ “burst” ช่วงสั้นๆ ได้
จุดเด่นคือ limiter นี้แยก bucket ต่อ device (key = device_id) ไม่ใช่ต่อ IP — เพราะ IoT gateway หลายตัวอาจอยู่หลัง IP เดียวกัน ถ้า limit ด้วย IP จะโดน block กันทั้งวงโดยไม่ยุติธรรม
ค่า default จาก config คือ 120 req/min ต่อ device (APP_INGEST_RATE_LIMIT=120, APP_INGEST_RATE_WINDOW=1m) และมี maybeCleanup คอยกวาด bucket ที่ idle นานทิ้ง กันไม่ให้ map โตไม่หยุด
การประกอบร่างใน main.go
ทุกอย่างมาเสียบกันใน backend/cmd/server/main.go — เห็น dependency injection ชัดเจน:
// InfluxDB client (ตอนสร้างไม่มี network I/O — เช็ค reachability ที่ /healthz)
influx := database.NewInflux(cfg.Influx)
sensorService := service.NewSensorService(deviceRepo, influx, cfg.Influx.Measurement)
sensorLimiter := ratelimit.New(cfg.Ingest.RateLimit, cfg.Ingest.RateWindow)
sensorHandler := handler.NewSensorHandler(sensorService, sensorLimiter, cfg.Ingest.MaxBatchSize)
app := server.New(cfg, log, server.Dependencies{
Mongo: db,
Influx: influx,
DeviceService: deviceService,
SensorHandler: sensorHandler,
})
deviceRepo ตัวเดียวถูกใช้ทั้งใน device service และ sensor service — เพราะ sensor service ต้องเช็ค registry นั่นเอง
มาลุยกัน — ทดสอบ API จริง
Single Reading
# ส่ง reading เดียว (device ต้องลงทะเบียนไว้แล้วจากตอน Device API)
curl -X POST http://localhost:3000/api/v1/sensors/data \
-H "Content-Type: application/json" \
-d '{
"device_id": "sensor-temp-001",
"fields": { "temperature": 28.5, "humidity": 65.2 },
"tags": { "location": "floor-3" }
}'
ถ้าสำเร็จจะได้ 202 Accepted:
{ "data": { "accepted": 1 } }
Batch (สูงสุด 500 readings)
curl -X POST http://localhost:3000/api/v1/sensors/data/batch \
-H "Content-Type: application/json" \
-d '{
"readings": [
{ "device_id": "sensor-temp-001", "fields": { "temperature": 28.5 } },
{ "device_id": "sensor-temp-002", "fields": { "temperature": 27.1 } }
]
}'
{ "data": { "accepted": 2 } }
ลองยิงให้พัง
# 1) device ที่ไม่ได้ลงทะเบียน → 404 "device is not registered"
curl -i -X POST http://localhost:3000/api/v1/sensors/data \
-H "Content-Type: application/json" \
-d '{ "device_id": "ghost-999", "fields": { "x": 1 } }'
# 2) ยิงรัวเกิน 120 req/min ต่อ device → 429 "rate limit exceeded for device"
เมื่อโดน rate limit เด้งกลับมาแบบนี้:
╔══════════════════════════════╗
║ ║
║ ┌( ಠ_ಠ)┘ ║
║ ║
║ HTTP 429: ใจเย็นๆ device ║
║ ║
╚══════════════════════════════╝
สรุป: วันนี้เราทำอะไรไปบ้าง
| ส่วนประกอบ | รายละเอียด |
|---|---|
| Endpoints | POST /api/v1/sensors/data และ /sensors/data/batch (สูงสุด 500) |
| Model | SensorReading — fields เป็น float64, tags เป็น string, timestamp เป็น pointer |
| Validation | go-playground/validator v10 + custom rule alphanumdash |
| Registry check | เช็คทุก reading กับ MongoDB — unknown=404, disabled=403 |
| Tags enrichment | เติม device_type/group_id จาก registry (เชื่อถือได้กว่าให้ device ส่งมา) |
| Influx write | WriteAPIBlocking — error เด้งกลับเป็น HTTP 502 |
| Rate limit | token-bucket ต่อ device (default 120 req/min) ไม่พึ่ง dependency |
หัวใจของตอนนี้คือ 2 เรื่อง: เช็คก่อนเขียนเสมอ และ blocking write เพื่อให้ error ไม่เงียบหาย — ระบบที่ดีต้องบอกความจริงกับคนส่ง ไม่ใช่ยิ้มรับแล้วแอบทำข้อมูลหล่น
Next Step
ตอนหน้าเราจะเชื่อม MQTT Broker เพราะ sensor ในชีวิตจริงส่วนใหญ่ไม่ได้คุยผ่าน REST — พวกมันใช้ MQTT ที่เบากว่ามาก และจุดเด็ดคือ telemetry ที่มาทาง MQTT จะวิ่งเข้า ingestion path เดียวกันเป๊ะ กับที่เราเพิ่งสร้าง
Navigation
- ก่อนหน้า: IoT Workshop #6: Device Management API
- ถัดไป: IoT Workshop #8: MQTT Integration
- แผนการ Workshop ทั้งหมด: IoT Workshop Master Plan