IoT Workshop #2: ออกแบบ Database ให้ไม่งง

IoT Workshop #2: ออกแบบ Database ให้ไม่งง

ShowkhunWorkshop

IoT Workshop #2: ออกแบบ Database ให้ไม่งง

Branch: step-02-mongodb-models + step-07-influx-setup — อ้างอิงโค้ดจาก step-19-e2e Phase: Planning (2/3) Repo: kangana1024/showkhun-workshop


ถามจริงๆ น้องๆ เคยสงสัยไหมว่าทำไม IoT system ถึงต้องใช้ database หลายตัว? ใช้ MySQL ตัวเดียวไม่ได้เหรอ? คำตอบคือ “ได้ แต่จะเจ็บปวดมาก” 555 — มาลุยกันเลยดีกว่า เราจะออกแบบ data layer ทั้งหมดตามที่ implement จริง พร้อม WHY ก่อน HOW เสมอ!


สิ่งที่น้องๆ จะได้เรียนรู้

  • WHY ต้องใช้ 2 databases — MongoDB กับ InfluxDB ต่างกันยังไง
  • โครงสร้าง MongoDB collections จริง: devices, users, device_groups, alert_rules, alert_history
  • เทคนิค index ที่ใช้จริง — unique, compound และ TTL index บน expires_at
  • InfluxDB 2.7 buckets/measurements/tags/fields ที่ backend เขียนจริง (ไม่ใช่ retention policy แบบ 1.8)
  • หลัก tags vs fields ที่กัน high-cardinality trap
  • อ่านข้อมูลกลับด้วย Flux (ไม่ใช่ InfluxQL)

WHY ก่อน: ทำไมต้องใช้ 2 Databases?

ลองนึกภาพแบบนี้นะ — ข้อมูลใน IoT system มี 2 แบบที่ ต่างกันโดยสิ้นเชิง:

อุปมาอุปไมย: ลองนึกถึงสมุดโทรศัพท์ VS ไดอารี่บันทึกประจำวัน

  • สมุดโทรศัพท์ (MongoDB) — ข้อมูล device, user, group, alert rule — อ่านได้, แก้ได้, ลบได้, มี structure ซับซ้อน
  • ไดอารี่ (InfluxDB) — sensor readings — เขียนเพิ่มทุกวินาที, ย้อนดูตามช่วงเวลา, ไม่ค่อยแก้ไข

ถ้าเอา “ไดอารี่” ไปเก็บใน “สมุดโทรศัพท์” เดียวกัน เดี๋ยวสมุดโทรศัพท์จะหนาจนยกไม่ขึ้น (และ query ช้าสุดๆ)

( ╥ω╥ ) ← สภาพ DBA ที่ยอมเก็บ time-series ใน MySQL
Database ใช้ทำอะไร ลักษณะข้อมูล
MongoDB 8 Device registry, users, groups, alert rules/history Document-based, CRUD-heavy, schema ซับซ้อนได้
InfluxDB 2.7 Sensor readings (telemetry) Append-only, เรียงตามเวลา, query ด้วย Flux เร็วมาก
┌──────────────────────────────────────────────────┐
│                   Data Layer                      │
│                                                   │
│  ┌─────────────────┐  ┌────────────────────────┐  │
│  │    MongoDB       │  │      InfluxDB 2.7      │  │
│  │  (สมุดโทรศัพท์)  │  │       (ไดอารี่)        │  │
│  │                 │  │                        │  │
│  │  - devices      │  │  bucket: iot_workshop  │  │
│  │  - users        │  │   └ sensor_data        │  │
│  │  - device_groups│  │   └ telegraf_sensor_data│ │
│  │  - alert_rules  │  │  bucket:               │  │
│  │  - alert_history│  │   iot_workshop_downsampled│ │
│  │    (TTL index)  │  │                        │  │
│  └─────────────────┘  └────────────────────────┘  │
└──────────────────────────────────────────────────┘

หมายเหตุสำคัญ: ในโค้ดจริง ไม่มี collection commands_log หรือ configs แยก — command ถูก publish ผ่าน MQTT ตรงๆ (ไม่ persist เป็น collection) และ config ของ device ฝังอยู่ใน device document เอง เราจะออกแบบตามของจริงเท่านั้น ไม่เพิ่มสิ่งที่ไม่ได้ใช้


ภาพรวม Data Model ทั้งหมด

ก่อนลงรายละเอียด ขอให้เห็น big picture ก่อนนะ — collections เชื่อมกันยังไง:

erDiagram
    USERS ||--o{ DEVICES : owns
    USERS ||--o{ DEVICE_GROUPS : owns
    USERS ||--o{ ALERT_RULES : owns
    DEVICE_GROUPS ||--o{ DEVICES : groups
    DEVICE_GROUPS ||--o{ DEVICE_GROUPS : nests
    ALERT_RULES ||--o{ ALERT_HISTORY : triggers
    DEVICES ||--o{ ALERT_HISTORY : "by device_id"
    USERS {
        ObjectId _id
        string email UK
        string username UK
        string password_hash
        string role
    }
    DEVICES {
        ObjectId _id
        string device_id UK
        string type
        string status
        ObjectId group_id FK
        string token
    }
    DEVICE_GROUPS {
        ObjectId _id
        string name
        ObjectId parent_id FK
    }
    ALERT_RULES {
        ObjectId _id
        string type
        object condition
        string device_id
        int cooldown_seconds
    }
    ALERT_HISTORY {
        ObjectId _id
        ObjectId rule_id FK
        string device_id
        datetime expires_at "TTL"
    }

โอเค ตอนนี้เห็นภาพแล้ว มาลงรายละเอียดแต่ละ collection กัน!


MongoDB Collections

โครงสร้างทุก collection map ตรงกับ Go struct ใน backend/internal/model/ — มี BSON tag กำกับว่า field ไหนเก็บลง Mongo ยังไง

1. devices Collection — หัวใจของระบบ

นี่คือ collection ที่สำคัญที่สุด เป็น “ทะเบียนบ้าน” ของ device ทุกตัว เก็บประเภท, ตำแหน่ง, status, tag และ ingestion token (ไม่เคย serialize ออก API):

// devices collection (จาก model.Device)
{
  _id: ObjectId("..."),
  device_id: "sensor-temp-001",          // unique, human-readable id
  name: "Temperature Sensor #1",
  description: "ห้องเซิร์ฟเวอร์ ชั้น 3",
  type: "temperature_humidity",          // temperature_humidity | motion | relay | gateway
  status: "online",                      // online | offline | error | maintenance
  group_id: ObjectId("..."),             // อ้างอิง device_groups (optional)
  owner_id: ObjectId("..."),             // อ้างอิง users
  location: {                            // optional, nested
    building: "อาคาร A",
    floor: 3,
    room: "Server Room",
    latitude: 13.7563,
    longitude: 100.5018
  },
  tags: ["server-room", "critical", "floor-3"],
  enabled: true,
  token: "<ingestion-token>",            // json:"-" → ไม่หลุดออก API
  last_seen_at: ISODate("2026-03-26T10:30:00Z"),
  created_at: ISODate("2026-01-15T08:00:00Z"),
  updated_at: ISODate("2026-03-26T10:30:00Z")
}

Index ที่สร้างจริง (จาก repository/mongo/device.goEnsureIndexes):

db.devices.createIndex({ device_id: 1 }, { unique: true, name: "uniq_device_id" })
db.devices.createIndex({ status: 1, type: 1 },         { name: "status_type" })
db.devices.createIndex({ group_id: 1 },                { name: "group_id" })
db.devices.createIndex({ tags: 1 },                    { name: "tags" })
db.devices.createIndex({ created_at: -1 },             { name: "created_at_desc" })

Tips จากพี่: เราไม่เก็บ “ค่าล่าสุดที่วัดได้” ลงใน device document — ค่าล่าสุดอ่านจาก InfluxDB ด้วย Flux (last()) เอา เพราะ time-series เป็นเจ้าของข้อมูล reading จริงๆ device เก็บแค่ last_seen_at ไว้ดูว่า device ยังมีชีวิตอยู่ไหม

2. users Collection — จัดการคนที่ใช้ระบบ

จุดสำคัญ: password เก็บเป็น argon2id (ไม่ใช่ bcrypt) และมี field ที่เกี่ยวกับ refresh-token rotation เพื่อความปลอดภัย:

// users collection (จาก model.User)
{
  _id: ObjectId("..."),
  email: "[email protected]",
  username: "admin",
  password_hash: "$argon2id$v=19$m=65536,t=3,p=2$...",  // json:"-"
  role: "admin",                         // admin | operator | viewer
  display_name: "สมชาย ดีงาม",
  is_active: true,
  token_version: 0,                      // bump เพื่อ revoke token ทั้ง family
  refresh_token_hash: "<sha256 ของ jti>", // json:"-" (กัน refresh-token replay)
  last_login_at: ISODate("2026-03-26T08:00:00Z"),
  created_at: ISODate("2026-01-01T00:00:00Z"),
  updated_at: ISODate("2026-03-26T08:00:00Z")
}

Index ที่สร้างจริง:

db.users.createIndex({ email: 1 },      { unique: true, name: "uniq_email" })
db.users.createIndex({ username: 1 },   { unique: true, name: "uniq_username" })
db.users.createIndex({ created_at: -1 }, { name: "created_at_desc" })

สิทธิ์ในระบบจริงเป็นแบบ role-based (RBAC) 3 ระดับ: viewer (อ่าน), operator (เพิ่มสิทธิ์ mutation), admin (จัดการ user) — ไม่ใช่ permission array แบบ fine-grained บังคับสิทธิ์ที่ฝั่ง server ทุก route (รายละเอียดใน #21 Auth & RBAC)

3. device_groups Collection — จัดกลุ่ม device

เหมือน “folder” สำหรับจัด device ให้เป็นระเบียบ รองรับ nested groups (group ใน group ได้):

// device_groups collection (จาก model.DeviceGroup)
{
  _id: ObjectId("..."),
  name: "Server Room Sensors",
  description: "เซ็นเซอร์ทั้งหมดในห้องเซิร์ฟเวอร์",
  parent_id: null,                       // อ้างอิง group แม่ (nested)
  owner_id: ObjectId("..."),
  color: "#FF6B35",
  icon: "server",
  created_at: ISODate("2026-01-15T08:00:00Z"),
  updated_at: ISODate("2026-03-26T10:00:00Z")
}

4. alert_rules Collection — กฎการแจ้งเตือน

นี่คือ “หน้าที่เจ้าหน้าที่รักษาความปลอดภัย” รองรับ rule 3 type โดย field ใน condition ใช้ต่างกันตาม type:

// alert_rules collection (จาก model.AlertRule)
{
  _id: ObjectId("..."),
  name: "High Temperature Alert",
  description: "แจ้งเตือนเมื่ออุณหภูมิเกิน 40°C",
  type: "threshold",                     // threshold | offline | anomaly
  condition: {
    // threshold ใช้: metric + operator + value
    metric: "temperature",
    operator: "gt",                      // gt | gte | lt | lte | eq | neq
    value: 40,
    // offline ใช้: offline_after_seconds
    // anomaly  ใช้: metric + zscore_threshold
  },
  severity: "critical",                  // info | warning | critical
  device_id: "sensor-temp-001",          // เว้นว่าง = ใช้กับทุก device
  cooldown_seconds: 900,                 // กัน spam ต่อ rule+device
  enabled: true,
  owner_id: ObjectId("..."),
  created_at: ISODate("2026-02-01T00:00:00Z"),
  updated_at: ISODate("2026-03-20T00:00:00Z")
}

Index ที่สร้างจริง:

db.alert_rules.createIndex({ enabled: 1, device_id: 1 }, { name: "enabled_device" })
db.alert_rules.createIndex({ type: 1 },                  { name: "type" })

cooldown_seconds สำคัญมาก! ถ้าไม่มี พออุณหภูมิสูงค้างไว้ ทุก reading จะยิง alert รัวๆ จน Slack แตก — มันเป็นวินาที (ไม่ใช่นาที) ตาม field จริง

5. alert_history Collection — ประวัติการแจ้งเตือน + TTL

เก็บทุกครั้งที่ rule ยิงจริง และมี TTL index ให้ Mongo ลบ record เก่าทิ้งเอง:

// alert_history collection (จาก model.AlertHistory)
{
  _id: ObjectId("..."),
  rule_id: ObjectId("..."),
  rule_name: "High Temperature Alert",
  type: "threshold",
  device_id: "sensor-temp-001",
  severity: "critical",
  metric: "temperature",
  value: 42.3,
  message: "High Temperature Alert on sensor-temp-001: temperature = 42.3 gt 40",
  triggered_at: ISODate("2026-03-26T10:30:00Z"),
  expires_at: ISODate("2026-06-24T10:30:00Z")   // TTL anchor
}

Index ที่สร้างจริง (จาก repository/mongo/alert.go):

// TTL index — ลบ document เมื่อ "expires_at" ผ่านไป (expireAfterSeconds: 0)
db.alert_history.createIndex({ expires_at: 1 },
  { name: "ttl_expires_at", expireAfterSeconds: 0 })

db.alert_history.createIndex({ rule_id: 1, device_id: 1, triggered_at: -1 },
  { name: "rule_device_time" })

TTL Index แบบนี้เจ๋งกว่าที่คิด — เราตั้ง expireAfterSeconds: 0 แล้วให้แต่ละ document พก expires_at ของตัวเองมา (backend ตั้ง = triggered_at + 90 วัน) Mongo จะลบทิ้งทันทีที่ expires_at ผ่านไป ยืดหยุ่นกว่าการตั้ง TTL คงที่บน created_at — และไม่ต้องเขียน cron ลบเอง


InfluxDB 2.7 Schema Design

WHY InfluxDB? ทำไมไม่ใช้ MongoDB เก็บ sensor data เลย?

อุปมา: เหมือนถามว่า “ทำไมต้องใช้ Excel ทำ pivot table ทั้งที่ Word ก็พิมพ์ตารางได้?” — ทำได้ แต่มันไม่ได้ถูก design มาเพื่องานนั้น

InfluxDB ถูก optimize มาเพื่อ time-series โดยเฉพาะ — query 30 วันย้อนหลัง ทีละ 1 นาที บน device 100 ตัว ทำได้ใน milliseconds

⚠️ เลิกใช้ retention policy / continuous query แบบ InfluxDB 1.8 แล้ว! workshop นี้ใช้ InfluxDB 2.7 ซึ่งจัดการด้วย bucket + retention + Flux ไม่ใช่ CREATE RETENTION POLICY / CREATE CONTINUOUS QUERY แบบ InfluxQL เดิม

Buckets & Retention (2.x)

ใน InfluxDB 2.x หน่วยเก็บข้อมูลคือ bucket (มี retention ในตัว) ไม่ใช่ database + retention policy แบบเก่า backend สร้าง bucket ให้อัตโนมัติตอน startup (ปิดได้ด้วย APP_INFLUX_ENSURE_BUCKETS=false):

Bucket env Retention (default) ใช้ทำอะไร
iot_workshop APP_INFLUX_BUCKET APP_INFLUX_RAW_RETENTION=720h (30 วัน) ข้อมูล raw จาก backend + Telegraf
iot_workshop_downsampled APP_INFLUX_DOWNSAMPLE_BUCKET APP_INFLUX_DOWNSAMPLE_RETENTION=8760h (1 ปี) ข้อมูลที่ downsample แล้ว (retention ยาวกว่า)

retention ใช้ Go duration (ไม่มี suffix d) — วันแปลงเป็นชั่วโมง: 30d = 720h, 365d = 8760h ส่วน org คือ showkhun (env APP_INFLUX_ORG)

Measurement: sensor_data (backend เขียน)

ใน InfluxDB ไม่เรียก “collection” แต่เรียก measurement — คิดซะว่าคือ “ชื่อตาราง” backend เขียน point ลง measurement นี้พร้อม tag ที่ดึงจาก registry:

sensor_data  (env: APP_INFLUX_MEASUREMENT)
├── Tags (indexed, string only) ← ใช้ filter / group
│   ├── device_id    = "sensor-temp-001"   (จาก registry)
│   ├── device_type  = "temperature_humidity" (จาก registry)
│   ├── group_id     = "<ObjectId hex>"    (จาก registry, ถ้ามี group)
│   └── (+ payload tags) location / zone / unit / source ...

├── Fields (not indexed, numeric) ← ค่าจริงที่วัดได้
│   ├── temperature  = 25.5    (float)
│   ├── humidity     = 60.2    (float)
│   └── ... ทุกค่าใน payload "fields"

└── Timestamp = payload "timestamp" (ถ้ามี) หรือเวลาที่ ingest

Tags vs Fields — นี่คือจุดที่คนสับสนบ่อยที่สุด! Tags เปรียบเหมือน “หมวดหมู่” ที่ index แล้ว (low-cardinality) ส่วน Fields คือ “ค่าที่วัดได้จริง” (numeric) ถ้าเอา temperature ไปเป็น tag ระบบจะพัง เพราะ cardinality สูงมาก — model จริงบังคับ Fields map[string]float64 กับ Tags map[string]string แยกกันชัดเจน

Write example (Line Protocol):

sensor_data,device_id=sensor-temp-001,device_type=temperature_humidity,location=room-a temperature=25.5,humidity=60.2 1711443000000000000

Measurement: telegraf_sensor_data (Telegraf เขียน)

Telegraf subscribe devices/+/telemetry เหมือน backend แต่เขียน measurement ของตัวเอง เพื่อกัน double-write:

telegraf_sensor_data  (bucket เดียวกัน: iot_workshop)
├── Tags
│   ├── device_id  = ดึงจาก topic segment ที่ 2
│   └── location / zone / unit / source  (optional, จาก payload "tags")
├── Fields = ทุกค่าตัวเลขใน payload "fields" (converter coerce เป็น float)
└── Timestamp = เวลาที่ message มาถึง (Telegraf path ไม่ bind timestamp ของ payload)

ทำไมต้องแยก measurement? ถ้า backend และ Telegraf เขียน sensor_data ตัวเดียวกัน ข้อมูลจะถูกเก็บ ซ้ำสองรอบ การให้ Telegraf เขียน telegraf_sensor_data ทำให้สอง path เป็นแบบ “เพิ่มเข้ามา” ไม่ใช่ “ซ้ำ” เทียบกันได้ตรงๆ ใน workshop (production เลือก canonical writer ตัวเดียว)


Data Flow — ข้อมูลไหลยังไงตั้งแต่ sensor จนถึง database?

graph LR
    S[🌡️ Sensor] -->|MQTT| B[📡 Mosquitto]
    S -->|REST| G[🖥️ Go Backend]
    B -->|subscribe| G
    B -->|subscribe| T[🔁 Telegraf]
    G -->|validate vs registry| M[(🍃 MongoDB)]
    G -->|write sensor_data| I[(📈 InfluxDB)]
    T -->|write telegraf_sensor_data| I
    G -->|Flux read| I

เห็นไหมว่า Telegraf กับ Go Backend ทำงานคู่ขนานกัน — Telegraf เป็น “สายพานอัตโนมัติ” ที่ยกข้อมูลเข้า InfluxDB แบบ config ล้วน ส่วน Go Backend ทำมากกว่า: validate กับ registry, เติม tag, fan-out WebSocket และ run alert engine


Query Patterns — ตัวอย่าง Queries ที่ใช้บ่อย

MongoDB Queries (typed filter)

// device ที่ online ในกลุ่มหนึ่ง เรียงตามชื่อ
db.devices.find({
  group_id: ObjectId("..."),
  status: "online",
  enabled: true
}).sort({ name: 1 })

// rule ที่ enabled สำหรับ device หนึ่ง (engine ใช้ตอนประเมิน)
db.alert_rules.find({
  enabled: true,
  $or: [{ device_id: "sensor-temp-001" }, { device_id: { $exists: false } }, { device_id: "" }]
})

// ประวัติ alert ล่าสุดของ device
db.alert_history.find({ device_id: "sensor-temp-001" })
  .sort({ triggered_at: -1 }).limit(20)

InfluxDB Queries (Flux — ไม่ใช่ InfluxQL)

InfluxDB 2.x ใช้ภาษา Flux อ่านข้อมูล ในระบบจริง backend สร้าง Flux query แบบ bounded + parameterised (ไม่ interpolate ค่าของ user ลง query string ตรงๆ กัน injection):

// readings ดิบของ device ย้อนหลัง 6 ชั่วโมง (เฉพาะ field temperature)
from(bucket: "iot_workshop")
  |> range(start: -6h)
  |> filter(fn: (r) => r._measurement == "sensor_data")
  |> filter(fn: (r) => r.device_id == "sensor-temp-001")
  |> filter(fn: (r) => r._field == "temperature")
  |> limit(n: 200)

// downsample ค่าเฉลี่ยรายชั่วโมง 24 ชั่วโมงล่าสุด
from(bucket: "iot_workshop")
  |> range(start: -24h)
  |> filter(fn: (r) => r._measurement == "sensor_data" and r.device_id == "sensor-temp-001")
  |> aggregateWindow(every: 1h, fn: mean)

// ค่าล่าสุดต่อ device
from(bucket: "iot_workshop")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "sensor_data")
  |> last()

backend เปิด API ให้เรียก Flux เหล่านี้ผ่าน REST: GET /api/v1/devices/:id/readings และ GET /api/v1/sensors/query รับ query param range, window, aggregate (mean|max|min|sum|last|first|count), field, limit ทุกค่าถูก clamp กับ ceiling ใน config (APP_INFLUX_MAX_QUERY_RANGE, APP_INFLUX_MAX_QUERY_POINTS)


Migration & Seed Data Plan

MongoDB — Indexes อัตโนมัติ + Seed Admin

ไม่ต้องเขียน migration script แยก! backend เรียก EnsureIndexes ของแต่ละ repository ตอน startup (idempotent ปลอดภัยทุกครั้ง) ส่วน admin คนแรก seed ด้วย Makefile:

# สร้าง JWT secret (อย่างน้อย 32 bytes) — ใส่ใน APP_AUTH_JWT_SECRET
openssl rand -base64 48

# bootstrap admin คนแรก (ทำงานครั้งเดียวตอน users collection ว่าง)
make seed-admin [email protected] ADMIN_PASSWORD='a-strong-password'

ไม่ใช้ JSON-schema validator บน collection — เราบังคับ format ที่ application layer ด้วย go-playground/validator (model มี validate:"..." tag) และ enum type ของ Go (DeviceType.Valid(), AlertRuleType.Valid() ฯลฯ) ซึ่งจับ garbage data ได้ตั้งแต่ก่อนถึง Mongo

InfluxDB — Provision ผ่าน Docker + Ensure ตอน startup

# infra/docker-compose.yml ตั้ง org/bucket/token ให้อัตโนมัติด้วย DOCKER_INFLUXDB_INIT_*
# backend EnsureBuckets() ตอน startup: สร้าง bucket raw + downsampled + align retention
# (ปิดได้ด้วย APP_INFLUX_ENSURE_BUCKETS=false ถ้า token เป็น read-only)

สรุป — เราได้ออกแบบอะไรไปบ้าง?

ใน workshop #2 นี้เราลุยกันไปเยอะมาก:

  • MongoDB Schema — 5 collections จริง (devices, users, device_groups, alert_rules, alert_history) พร้อม index ที่ใช้จริง รวม TTL index บน expires_at
  • InfluxDB 2.7 Schema — 2 bucket (raw + downsampled) และ 2 measurement (sensor_data จาก backend + telegraf_sensor_data จาก Telegraf) แยก tags/fields ตามหลัก time-series
  • Flux query — อ่านข้อมูลกลับด้วย Flux แบบ bounded + parameterised (ไม่ใช่ InfluxQL/retention policy แบบ 1.8)
  • Query Patterns — ตัวอย่างจริงทั้ง Mongo (typed filter) และ InfluxDB (Flux)
  • Seed/Migration — index auto ตอน startup + seed admin ผ่าน Makefile, validate ที่ application layer

(ง •̀_•́)ง Database design เสร็จแล้ว! ตอนนี้น้องๆ ควรเห็นชัดว่า data ทุก type อยู่ที่ไหน เก็บยังไง และเชื่อมกันด้วยอะไร — นั่นคือ foundation ที่แข็งแกร่งก่อนจะลงมือ code จริง


Navigation: