ติดตั้ง InfluxDB 2.7 + Telegraf เป็นคลังข้อมูล IoT

ติดตั้ง InfluxDB 2.7 + Telegraf เป็นคลังข้อมูล IoT

ShowkhunWorkshop

Branch: step-07-influx-setup (โครงรวมจบที่ step-09-alerting) Phase: Development (7/9) — Time-series Storage Repo: kangana1024/showkhun-workshop


ถ้าเรามองว่า sensor คือคนที่เก็บข้อมูล แล้วถามว่า — ข้อมูลพวกนั้นไปนอนอยู่ที่ไหน? ใครจัดเก็บ? ใครลบของเก่าทิ้งให้?

คำตอบคือ InfluxDB ครับ — time-series database ที่เป็น “คลังเก็บของ” ของระบบ IoT เรา วันนี้เราจะติดตั้ง InfluxDB 2.7 คู่กับ Telegraf ด้วย Docker Compose พร้อม config ที่ใช้ในโปรเจกต์จริง มาลุยกันเลย! (ง •̀_•́)ง

หมายเหตุก่อนเริ่ม: ตอนแรกๆ ที่วาง draft เราเคยคิดจะใช้ “TICK Stack” (InfluxDB 1.8 + Telegraf + Chronograf + Kapacitor) แต่พอลงมือทำจริง โปรเจกต์นี้เลือก InfluxDB 2.7 + Flux เป็น query language และเขียน alerting engine เป็น Go เอง (จะเล่าใน ตอนที่ 9) — Chronograf/Kapacitor เลยไม่ได้ใช้ บทความนี้จึงอ้างอิงตามโค้ดจริงทั้งหมด


น้องๆ จะได้อะไรจาก workshop นี้

  • เข้าใจว่า ทำไม IoT ต้องใช้ time-series database แทน relational ธรรมดา
  • รู้ความต่างของ InfluxDB 2.x (Org / Bucket / Token / Flux) กับ 1.x แบบเก่า
  • ติดตั้ง InfluxDB 2.7 + Telegraf ด้วย Docker Compose ที่ pin version ครบ
  • เซ็ตอัป Org, Bucket, Retention และ bucket สำหรับ downsampled แยกต่างหาก
  • เข้าใจว่า backend (Go) กับ Telegraf เขียนลงคนละ measurement ได้ยังไงโดยไม่ตีกัน
  • รัน + ทดสอบ connectivity ทั้งระบบ

ก่อนอื่น — ทำไมต้อง Time-series Database?

ลองนึกภาพว่าอาคารมีเซ็นเซอร์วัดอุณหภูมิ 500 ตัว ส่งข้อมูลทุก 5 วินาที

ถ้าเราเก็บลง MySQL แบบปกติ… ตารางจะพองขึ้น หลายล้านแถวต่อวัน query ช้า index พัง ชีวิตพัง

InfluxDB ถูกออกแบบมาเพื่อเรื่องนี้โดยเฉพาะ — มันรู้ว่าข้อมูลมี “เวลา” เป็นแกนหลัก เก็บได้เร็ว query ได้เร็ว และที่เด็ดสุดคือ ลบข้อมูลเก่าให้อัตโนมัติ ตาม retention ที่เราตั้งไว้ ไม่ต้องมานั่งเขียน cron ลบเอง

อุปมา: InfluxDB เหมือนตู้เก็บเอกสารอัจฉริยะ ที่รู้เองว่า “เอกสารเก่าเกิน 30 วัน ฉีกทิ้งได้” โดยไม่ต้องรอเลขาฯ มาสั่ง


InfluxDB 2.x ต่างจาก 1.x ยังไง?

อันนี้สำคัญมาก เพราะถ้าเปิด tutorial เก่าๆ ในเน็ต ส่วนใหญ่ยังเป็น 1.x ซึ่ง concept คนละเรื่องกันเลย:

แนวคิด InfluxDB 1.x (เก่า) InfluxDB 2.x (ที่เราใช้)
หน่วยเก็บข้อมูล database + retention policy bucket (รวม retention มาในตัว)
การยืนยันตัวตน username / password API token
การจัดกลุ่ม ไม่มี org (organization)
ภาษา query InfluxQL (คล้าย SQL) Flux (เป็น pipeline |>)

ในโปรเจกต์นี้ทุกอย่างวิ่งบน 2.x หมด: เราคุยกับ InfluxDB ด้วย token, เขียน/อ่านเข้า bucket ภายใต้ org ชื่อ showkhun และ query ด้วย Flux (ตอนหน้า จะเจอ Flux เต็มๆ)


ภาพรวม Architecture

ก่อนลงมือ เราต้องรู้ว่าข้อมูลไหลยังไง:

graph LR
    A[🌡️ Sensors] -->|publish| B[📡 Mosquitto MQTT]
    B -->|subscribe| C[🤖 Telegraf]
    B -->|subscribe| D[🖥️ Go Backend]
    C -->|telegraf_sensor_data| E[(💾 InfluxDB 2.7)]
    D -->|sensor_data| E
    D -->|Flux query| F[📊 REST API]

สังเกตว่าข้อมูลจาก MQTT มี 2 ทางวิ่งเข้า InfluxDB ขนานกัน — ทั้ง Telegraf และ Go backend ต่าง subscribe topic เดียวกัน แต่เขียนคนละ measurement (เดี๋ยวเราจะเคลียร์เรื่องนี้กัน) ส่วนการ “อ่าน” ออกไปให้ frontend เป็นหน้าที่ของ Go backend ที่ยิง Flux query


Step 1: เลือก Image แล้ว pin version ให้ครบ

ทำไม ต้อง pin version ถึง patch? เพราะ workshop ต้อง reproduce ได้เป๊ะทุกครั้ง ถ้าปล่อย :latest วันนี้ลงได้ พรุ่งนี้ image อัปเดตแล้วพังก็ปวดหัว เหมือนสั่งกาแฟสูตรเดิมแต่บาริสต้าเปลี่ยนทุกวัน

นี่คือ version จริงที่โปรเจกต์ pin ไว้ (จาก infra/docker-compose.yml):

mongodb     : mongo:8.0.16              # device registry, users, alert rules/history
mosquitto   : eclipse-mosquitto:2.0.22  # MQTT broker
influxdb    : influxdb:2.7.12           # time-series storage
telegraf    : telegraf:1.39.0           # MQTT -> InfluxDB ingestion agent

ในตอนนี้เราโฟกัสที่ influxdb กับ telegraf เป็นหลัก (mongodb/mosquitto ตั้งไว้ตั้งแต่ตอนก่อนๆ แล้ว)


Step 2: Docker Compose สำหรับ InfluxDB 2.7

ทำไม ถึงใช้ Docker Compose? เพราะเราต้องการให้ทุก container อยู่ใน network เดียวกัน คุยกันด้วย hostname (influxdb, mosquitto) โดยไม่ต้องจำ IP — เหมือนคนในบ้านเดียวกันตะโกนเรียกชื่อกันได้เลย

จุดที่ต่างจาก InfluxDB 1.x แบบชัดเจนคือ DOCKER_INFLUXDB_INIT_* — มันคือ “setup mode” ที่ตอน container บูตครั้งแรกจะสร้าง org, bucket, user, และ admin token ให้เสร็จในก้าวเดียว ไม่ต้องเข้าไป config มือทีหลัง

# infra/docker-compose.yml (เฉพาะ 2 service ที่เกี่ยวกับตอนนี้)
name: showkhun-iot

services:
  influxdb:
    image: influxdb:2.7.12
    container_name: showkhun-influxdb
    restart: unless-stopped
    ports:
      - "${INFLUX_PORT:-8086}:8086"
    environment:
      # setup mode = บูตครั้งแรกสร้าง org/bucket/token ให้เลย
      DOCKER_INFLUXDB_INIT_MODE: setup
      DOCKER_INFLUXDB_INIT_USERNAME: ${INFLUX_USERNAME:-admin}
      DOCKER_INFLUXDB_INIT_PASSWORD: ${INFLUX_PASSWORD:-adminpassword}
      DOCKER_INFLUXDB_INIT_ORG: ${INFLUX_ORG:-showkhun}
      DOCKER_INFLUXDB_INIT_BUCKET: ${INFLUX_BUCKET:-iot_workshop}
      DOCKER_INFLUXDB_INIT_RETENTION: ${INFLUX_RETENTION:-30d}
      DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: ${INFLUX_TOKEN:-dev-influx-token-change-me}
    volumes:
      - influx_data:/var/lib/influxdb2     # ข้อมูลจริง (2.x ใช้ /var/lib/influxdb2)
      - influx_config:/etc/influxdb2       # config + token
    healthcheck:
      test: ["CMD", "influx", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5
      start_period: 15s

  telegraf:
    image: telegraf:1.39.0
    container_name: showkhun-telegraf
    restart: unless-stopped
    depends_on:
      influxdb:
        condition: service_healthy   # รอ InfluxDB พร้อมก่อนค่อยเริ่ม
      mosquitto:
        condition: service_healthy
    environment:
      # ส่ง secret เข้าทาง env ไม่ hard-code ในไฟล์ config
      INFLUX_TOKEN: ${INFLUX_TOKEN:-dev-influx-token-change-me}
      INFLUX_ORG: ${INFLUX_ORG:-showkhun}
      INFLUX_BUCKET: ${INFLUX_BUCKET:-iot_workshop}
      MQTT_BROKER_URL: tcp://mosquitto:1883
    volumes:
      - ./telegraf/telegraf.conf:/etc/telegraf/telegraf.conf:ro

volumes:
  influx_data:
  influx_config:

สังเกต depends_on: ... condition: service_healthy — Telegraf จะรอจน InfluxDB และ Mosquitto ผ่าน healthcheck ก่อนค่อยสตาร์ท ไม่งั้น connect ไม่ติดตั้งแต่แรก (ˊ•͈ ꇴ •͈ˋ)


Step 3: ตั้งค่า .env (Org / Bucket / Token)

ทำไม ต้องแยก secret ออกมาไว้ใน .env? เพราะ token คือกุญแจบ้าน ถ้าเผลอ commit ขึ้น git ก็เหมือนแปะกุญแจไว้หน้าประตู โปรเจกต์เลยให้ copy จาก .env.example มาเป็น .env แล้วแก้ค่าจริงเอง (.env อยู่ใน .gitignore)

สร้าง infra/.env จาก template:

cd infra
cp .env.example .env

ค่าใน .env.example (ส่วน InfluxDB) หน้าตาแบบนี้ — ใช้ค่า default สำหรับ local ได้เลย แต่ production ต้องเปลี่ยน INFLUX_TOKEN เป็น token จริงที่แข็งแรง:

# --- InfluxDB 2.x ---
INFLUX_PORT=8086
INFLUX_USERNAME=admin
INFLUX_PASSWORD=adminpassword
INFLUX_ORG=showkhun
INFLUX_BUCKET=iot_workshop
INFLUX_RETENTION=30d
# Generate a strong token for anything beyond local development.
INFLUX_TOKEN=dev-influx-token-change-me

สำคัญ: INFLUX_TOKEN ตัวนี้ต้องตรงกันทั้ง 3 ที่: container ของ InfluxDB (เป็น admin token), Telegraf (เอาไว้เขียน), และ Go backend (เอาไว้ทั้งเขียนและอ่าน) — ถ้าไม่ตรง จะเขียนไม่เข้าหรืออ่านไม่ออก


Step 4: Retention และ Bucket แบบ Downsampled

นี่คือส่วนที่หลายคนพลาด เราคุยเรื่อง ทำไม ก่อนเสมอ

ทำไมต้องมี Retention?

ใน InfluxDB 2.x retention ผูกมากับ bucket เลย ไม่ได้แยกเป็น “retention policy” เหมือน 1.x — bucket หนึ่งใบมี retention หนึ่งค่า ในโปรเจกต์เราตั้ง raw bucket ไว้ 30 วัน เก็บข้อมูลละเอียด พอเลย 30 วัน InfluxDB ลบให้เอง

ทำไมต้องมี bucket “downsampled” แยก?

raw data ละเอียดมากแต่ก็กินที่มาก เราเลยอยากเก็บ “ข้อมูลสรุป” (เช่น ค่าเฉลี่ยรายชั่วโมง) ไว้นานกว่า โดยไม่ต้องแบกข้อมูลดิบ — โปรเจกต์เลยมี bucket ที่ 2 ชื่อ iot_workshop_downsampled เก็บนานถึง 1 ปี

อุปมา: raw bucket คือสมุดจดทุกนาที (เก็บ 1 เดือนพอ), downsampled bucket คือสรุปรายเดือนใส่ใน Excel (เก็บไว้เป็นปีก็ไม่หนัก)

ฝั่ง Go backend มี routine ชื่อ EnsureBuckets ที่ตอนสตาร์ทจะ สร้าง bucket ให้อัตโนมัติถ้ายังไม่มี และปรับ retention ให้ตรงถ้าเพี้ยน — เป็น idempotent เรียกกี่รอบก็ปลอดภัย โค้ดจริงจาก backend/internal/database/influx_setup.go:

// EnsureBuckets สร้าง bucket ที่ระบบต้องใช้ถ้ายังไม่มี และจัด retention ให้ตรง
// เรียกได้ทุกครั้งตอน startup (idempotent)
func (i *Influx) EnsureBuckets(ctx context.Context, cfg config.InfluxConfig) error {
	specs := []BucketSpec{
		{Name: cfg.Bucket, Retention: cfg.RawRetention}, // raw 30 วัน
	}
	// ถ้าตั้ง downsample bucket ไว้และไม่ซ้ำกับ raw → เพิ่มเข้าไปด้วย
	if cfg.DownsampleBucket != "" && cfg.DownsampleBucket != cfg.Bucket {
		specs = append(specs, BucketSpec{
			Name:      cfg.DownsampleBucket,
			Retention: cfg.DownsampleRetention, // 1 ปี
		})
	}

	orgAPI := i.client.OrganizationsAPI()
	org, err := orgAPI.FindOrganizationByName(ctx, i.org)
	if err != nil {
		return fmt.Errorf("ensure buckets: find org %q: %w", i.org, err)
	}

	for _, spec := range specs {
		if err := i.ensureBucket(ctx, org, spec); err != nil {
			return err
		}
	}
	return nil
}

ค่า bucket/retention เหล่านี้มาจาก config ของ backend (backend/internal/config/config.go) ซึ่ง default ตรงกับ docker-compose:

INFLUX_ORG                  = showkhun
INFLUX_BUCKET               = iot_workshop                # raw
INFLUX_DOWNSAMPLE_BUCKET    = iot_workshop_downsampled    # สรุป
INFLUX_RAW_RETENTION        = 720h   (= 30 วัน)
INFLUX_DOWNSAMPLE_RETENTION = 8760h  (= 365 วัน)
INFLUX_MEASUREMENT          = sensor_data
INFLUX_ENSURE_BUCKETS       = true   # ให้ backend สร้าง/จัด bucket ให้ตอนบูต

เกร็ด: duration ในนี้ใช้หน่วยชั่วโมง (720h) เพราะ Go duration ไม่มีหน่วย “วัน” (d) — ก็เลยต้องคูณเอา 30×24 = 720 นั่นแหละครับ


Step 5: เข้าใจ “สองทางเขียน” — Telegraf vs Go Backend

นี่คือจุดที่ workshop นี้จงใจออกแบบให้เห็น 2 วิธี ingestion ข้างกัน เพื่อเทียบ:

graph TD
    M[📡 devices/+/telemetry] --> T[🤖 Telegraf]
    M --> G[🖥️ Go Backend]
    T -->|measurement: telegraf_sensor_data| B[(iot_workshop bucket)]
    G -->|measurement: sensor_data| B

ทั้งสองตัว subscribe topic เดียวกัน (devices/+/telemetry) และเขียนลง bucket เดียวกัน (iot_workshop) — แต่เขียนคนละ measurement ถ้าเขียน measurement เดียวกัน ทุก reading จะถูกเก็บซ้ำสองรอบทันที! โปรเจกต์เลยให้:

  • Telegraf เขียน measurement telegraf_sensor_data (pure config, ไม่มี custom code)
  • Go backend เขียน measurement sensor_data (validate กับ device registry, เติม tag, fan-out ออก WebSocket)

ใน production จริงเลือกใช้ทางเดียวพอ — แต่ระหว่าง workshop รันทั้งคู่เพื่อเทียบได้เลย เพราะมันเป็น “ของเสริม” ไม่ใช่ “ของซ้ำ”

อุปมา: เหมือนมีพนักงาน 2 คนรับออเดอร์โต๊ะเดียวกัน แต่จดลงคนละสมุด — ออเดอร์ไม่ตีกัน เพราะแยกสมุดชัดเจน


Step 6: รันระบบขึ้นมา

cd infra

# สตาร์ททุก service (mongodb, mosquitto, influxdb, telegraf)
docker compose up -d

# ดูสถานะ — รอจน influxdb เป็น (healthy) ก่อน
docker compose ps

# ดู log ของ InfluxDB และ Telegraf
docker compose logs -f influxdb
docker compose logs -f telegraf

Step 7: ทดสอบ Connectivity

ถ้าทุกอย่างขึ้นครบแล้ว ลองเช็คทีละตัวครับ:

# 1) InfluxDB ตอบ ping ไหม (2.x ใช้คำสั่ง `influx ping` ในตัว container)
docker exec showkhun-influxdb influx ping
# ควรได้: OK

# 2) เช็คว่า bucket ถูกสร้างครบ (ต้องเห็น iot_workshop)
docker exec showkhun-influxdb influx bucket list \
  --org showkhun \
  --token dev-influx-token-change-me

# 3) ลอง query ด้วย Flux ผ่าน HTTP API ตรงๆ
curl -s --request POST http://localhost:8086/api/v2/query?org=showkhun \
  --header "Authorization: Token dev-influx-token-change-me" \
  --header "Accept: application/csv" \
  --header "Content-type: application/vnd.flux" \
  --data 'from(bucket:"iot_workshop") |> range(start:-1h) |> limit(n:5)'

ฝั่ง Go backend ก็เช็คสุขภาพ InfluxDB ผ่าน health endpoint ของมันเองได้ (โค้ด Ping อยู่ใน backend/internal/database/influx.go):

// Ping เช็คว่า InfluxDB เข้าถึงได้ — ใช้เป็น health check ของ backend
func (i *Influx) Ping(ctx context.Context) error {
	if _, ok := ctx.Deadline(); !ok {
		var cancel context.CancelFunc
		ctx, cancel = context.WithTimeout(ctx, i.timeout)
		defer cancel()
	}
	ok, err := i.client.Ping(ctx)
	if err != nil {
		return fmt.Errorf("ping influx: %w", err)
	}
	if !ok {
		return fmt.Errorf("ping influx: server reported not ready")
	}
	return nil
}
# health check ของ backend (รวมเช็ค influxdb อยู่ในนั้น)
curl -s http://localhost:8080/healthz | jq

สรุปสิ่งที่ทำใน Workshop นี้

 _________________________________________
|  Time-series storage is ready!          |
|   InfluxDB 2.7.12     [OK]              |
|   Telegraf 1.39.0     [OK]              |
|   org: showkhun       [OK]              |
|   bucket: iot_workshop (30d)   [OK]     |
|   bucket: ...downsampled (1y)  [OK]     |
|_________________________________________|
            \(^o^)/

น้องๆ ทำอะไรไปบ้างในตอนนี้:

สิ่งที่ทำ รายละเอียด
Docker Compose deploy InfluxDB 2.7.12 + Telegraf 1.39.0 พร้อม healthcheck
Setup mode บูตครั้งแรกสร้าง org showkhun, bucket iot_workshop, admin token ให้เลย
Retention raw 30 วัน + bucket downsampled 1 ปี (จัดให้โดย EnsureBuckets)
สองทางเขียน Telegraf (telegraf_sensor_data) + Go backend (sensor_data) ไม่ตีกัน
Connectivity influx ping, Flux query ผ่าน HTTP API, backend health check

สิ่งที่เราสร้างวันนี้คือ “คลังข้อมูล” ของระบบ IoT ทั้งหมด ข้อมูลจะไหลจาก sensor ผ่าน MQTT แล้วลงไปนอนใน InfluxDB รอให้เราดึงออกมาทำ dashboard และ alert ต่อไป


ขั้นตอนต่อไป

ตอนหน้าเราจะเจาะ Telegraf Pipeline แบบ json_v2 ตัวจริงในโปรเจกต์ — ตั้งแต่ subscribe MQTT, แกะ payload, ปั้น tag/field, จนเขียนลง influxdb_v2 พร้อมเขียน Flux query ดึงข้อมูลออกมาทาง REST API ด้วย อย่าพลาดนะครับ! (ง •̀_•́)ง