From 7bd2ba7b5417ad4ee47e5dadfba0d1d062feb667 Mon Sep 17 00:00:00 2001 From: "CanbiZ (MickLesk)" <47820557+MickLesk@users.noreply.github.com> Date: Mon, 9 Feb 2026 15:53:11 +0100 Subject: [PATCH] Add telemetry-ingest service and Docker updates Introduce a new telemetry ingestion service (misc/data/service.go) that implements an HTTP server to accept telemetry payloads, validate and sanitize inputs, apply rate limiting, compute dedupe hashes, and forward records to PocketBase with token-based auth. Add module file (misc/data/go.mod) setting module telemetry-ingest and Go version 1.25.5. Update Dockerfile to use golang:1.25-alpine and remove baked-in environment defaults (so runtime envs are required), keeping the build stage and final CMD. These changes add the core ingestion logic, dependency module, and align the build image/version. --- misc/data/Dockerfile | 9 +- misc/data/go.mod | 3 + misc/data/service.go | 566 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 570 insertions(+), 8 deletions(-) create mode 100644 misc/data/go.mod diff --git a/misc/data/Dockerfile b/misc/data/Dockerfile index f947b3a2d..9967acd25 100644 --- a/misc/data/Dockerfile +++ b/misc/data/Dockerfile @@ -1,5 +1,5 @@ # build stage -FROM golang:1.23-alpine AS build +FROM golang:1.25-alpine AS build WORKDIR /src COPY . . RUN go build -trimpath -ldflags "-s -w" -o /out/telemetry-ingest ./main.go @@ -11,11 +11,4 @@ USER app WORKDIR /app COPY --from=build /out/telemetry-ingest /app/telemetry-ingest EXPOSE 8080 -ENV LISTEN_ADDR=":8080" \ - MAX_BODY_BYTES="1024" \ - RATE_LIMIT_RPM="60" \ - RATE_BURST="20" \ - RATE_KEY_MODE="ip" \ - ENABLE_REQUEST_LOGGING="false" \ - UPSTREAM_TIMEOUT_MS="4000" CMD ["/app/telemetry-ingest"] diff --git a/misc/data/go.mod b/misc/data/go.mod new file mode 100644 index 000000000..ec6e8f2dd --- /dev/null +++ b/misc/data/go.mod @@ -0,0 +1,3 @@ +module telemetry-ingest + +go 1.25.5 diff --git a/misc/data/service.go b/misc/data/service.go index e69de29bb..99f322121 100644 --- a/misc/data/service.go +++ b/misc/data/service.go @@ -0,0 +1,566 @@ +package main + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "net" + "net/http" + "os" + "strings" + "sync" + "time" +) + +type Config struct { + ListenAddr string + TrustedProxiesCIDR []string + + // PocketBase + PBBaseURL string + PBAuthCollection string // "_dev_telemetry_service" + PBIdentity string // email + PBPassword string + PBTargetColl string // "_dev_telemetry_data" + + // Limits + MaxBodyBytes int64 + RateLimitRPM int // requests per minute per key + RateBurst int // burst tokens + RateKeyMode string // "ip" or "header" + RateKeyHeader string // e.g. "X-Telemetry-Key" + RequestTimeout time.Duration // upstream timeout + EnableReqLogging bool // default false (GDPR-friendly) +} + +type TelemetryIn struct { + Script string `json:"script"` + Version string `json:"version"` + Event string `json:"event"` + OsType string `json:"os_type"` + OsVersion string `json:"os_version,omitempty"` + PveVer string `json:"pve_version,omitempty"` + Arch string `json:"arch"` + Method string `json:"method,omitempty"` + Status string `json:"status,omitempty"` + ExitCode int `json:"exit_code,omitempty"` + Error string `json:"error,omitempty"` // must be sanitized/short +} + +type TelemetryOut struct { + Script string `json:"script"` + Version string `json:"version"` + Event string `json:"event"` + OsType string `json:"os_type"` + OsVersion string `json:"os_version,omitempty"` + PveVer string `json:"pve_version,omitempty"` + Arch string `json:"arch"` + Method string `json:"method,omitempty"` + Status string `json:"status,omitempty"` + ExitCode int `json:"exit_code,omitempty"` + Error string `json:"error,omitempty"` + + TS int64 `json:"ts"` + IngestDay string `json:"ingest_day"` + Hash string `json:"hash"` +} + +type PBClient struct { + baseURL string + authCollection string + identity string + password string + targetColl string + + mu sync.Mutex + token string + exp time.Time + http *http.Client +} + +func NewPBClient(cfg Config) *PBClient { + return &PBClient{ + baseURL: strings.TrimRight(cfg.PBBaseURL, "/"), + authCollection: cfg.PBAuthCollection, + identity: cfg.PBIdentity, + password: cfg.PBPassword, + targetColl: cfg.PBTargetColl, + http: &http.Client{ + Timeout: cfg.RequestTimeout, + }, + } +} + +func (p *PBClient) ensureAuth(ctx context.Context) error { + p.mu.Lock() + defer p.mu.Unlock() + + // refresh if token missing or expiring soon + if p.token != "" && time.Until(p.exp) > 60*time.Second { + return nil + } + + body := map[string]string{ + "identity": p.identity, + "password": p.password, + } + b, _ := json.Marshal(body) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, + fmt.Sprintf("%s/api/collections/%s/auth-with-password", p.baseURL, p.authCollection), + bytes.NewReader(b), + ) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + + resp, err := p.http.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + rb, _ := io.ReadAll(io.LimitReader(resp.Body, 4<<10)) + return fmt.Errorf("pocketbase auth failed: %s: %s", resp.Status, strings.TrimSpace(string(rb))) + } + + var out struct { + Token string `json:"token"` + // record omitted + } + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return err + } + if out.Token == "" { + return errors.New("pocketbase auth token missing") + } + + // PocketBase JWT exp can be parsed, but keep it simple: set 50 min + p.token = out.Token + p.exp = time.Now().Add(50 * time.Minute) + return nil +} + +func (p *PBClient) CreateTelemetry(ctx context.Context, payload TelemetryOut) error { + if err := p.ensureAuth(ctx); err != nil { + return err + } + + b, _ := json.Marshal(payload) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, + fmt.Sprintf("%s/api/collections/%s/records", p.baseURL, p.targetColl), + bytes.NewReader(b), + ) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+p.token) + + resp, err := p.http.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + rb, _ := io.ReadAll(io.LimitReader(resp.Body, 8<<10)) + return fmt.Errorf("pocketbase create failed: %s: %s", resp.Status, strings.TrimSpace(string(rb))) + } + return nil +} + +// -------- Rate limiter (token bucket / minute window, simple) -------- + +type bucket struct { + tokens int + reset time.Time +} + +type RateLimiter struct { + mu sync.Mutex + buckets map[string]*bucket + rpm int + burst int + window time.Duration + cleanInt time.Duration +} + +func NewRateLimiter(rpm, burst int) *RateLimiter { + rl := &RateLimiter{ + buckets: make(map[string]*bucket), + rpm: rpm, + burst: burst, + window: time.Minute, + cleanInt: 5 * time.Minute, + } + go rl.cleanupLoop() + return rl +} + +func (r *RateLimiter) cleanupLoop() { + t := time.NewTicker(r.cleanInt) + defer t.Stop() + for range t.C { + now := time.Now() + r.mu.Lock() + for k, b := range r.buckets { + if now.After(b.reset.Add(2 * r.window)) { + delete(r.buckets, k) + } + } + r.mu.Unlock() + } +} + +func (r *RateLimiter) Allow(key string) bool { + if r.rpm <= 0 { + return true + } + now := time.Now() + r.mu.Lock() + defer r.mu.Unlock() + + b, ok := r.buckets[key] + if !ok || now.After(b.reset) { + r.buckets[key] = &bucket{tokens: min(r.burst, r.rpm), reset: now.Add(r.window)} + b = r.buckets[key] + } + if b.tokens <= 0 { + return false + } + b.tokens-- + return true +} + +func min(a, b int) int { + if a < b { + return a + } + return b +} + +// -------- Utility: GDPR-safe key extraction -------- + +type ProxyTrust struct { + nets []*net.IPNet +} + +func NewProxyTrust(cidrs []string) (*ProxyTrust, error) { + var nets []*net.IPNet + for _, c := range cidrs { + _, n, err := net.ParseCIDR(strings.TrimSpace(c)) + if err != nil { + return nil, err + } + nets = append(nets, n) + } + return &ProxyTrust{nets: nets}, nil +} + +func (pt *ProxyTrust) isTrusted(ip net.IP) bool { + for _, n := range pt.nets { + if n.Contains(ip) { + return true + } + } + return false +} + +func getClientIP(r *http.Request, pt *ProxyTrust) net.IP { + // If behind reverse proxy, trust X-Forwarded-For only if remote is trusted proxy. + host, _, _ := net.SplitHostPort(r.RemoteAddr) + remote := net.ParseIP(host) + if remote == nil { + return nil + } + + if pt != nil && pt.isTrusted(remote) { + xff := r.Header.Get("X-Forwarded-For") + if xff != "" { + parts := strings.Split(xff, ",") + ip := net.ParseIP(strings.TrimSpace(parts[0])) + if ip != nil { + return ip + } + } + } + return remote +} + +// -------- Validation (strict allowlist) -------- + +var ( + allowedEvents = map[string]bool{"install": true, "update": true, "error": true} + allowedOsType = map[string]bool{"pve": true, "lxc": true, "vm": true, "debian": true, "ubuntu": true, "alpine": true} + allowedArch = map[string]bool{"amd64": true, "arm64": true} +) + +func sanitizeShort(s string, max int) string { + s = strings.TrimSpace(s) + if s == "" { + return "" + } + // remove line breaks and high-risk chars + s = strings.ReplaceAll(s, "\n", " ") + s = strings.ReplaceAll(s, "\r", " ") + if len(s) > max { + s = s[:max] + } + return s +} + +func validate(in *TelemetryIn) error { + in.Script = sanitizeShort(in.Script, 64) + in.Version = sanitizeShort(in.Version, 32) + in.Event = sanitizeShort(in.Event, 16) + in.OsType = sanitizeShort(in.OsType, 16) + in.Arch = sanitizeShort(in.Arch, 16) + in.Method = sanitizeShort(in.Method, 32) + in.Status = sanitizeShort(in.Status, 16) + in.OsVersion = sanitizeShort(in.OsVersion, 32) + in.PveVer = sanitizeShort(in.PveVer, 32) + + // IMPORTANT: "error" must be short and not contain identifiers/logs + in.Error = sanitizeShort(in.Error, 120) + + if in.Script == "" || in.Version == "" || in.Event == "" || in.OsType == "" || in.Arch == "" { + return errors.New("missing required fields") + } + if !allowedEvents[in.Event] { + return errors.New("invalid event") + } + if !allowedOsType[in.OsType] { + return errors.New("invalid os_type") + } + if !allowedArch[in.Arch] { + return errors.New("invalid arch") + } + // exit_code only relevant for error, but allow 0..255 + if in.ExitCode < 0 || in.ExitCode > 255 { + return errors.New("invalid exit_code") + } + return nil +} + +func computeHash(out TelemetryOut) string { + // hash over non-identifying fields (no IP) to enable dedupe if needed + key := fmt.Sprintf("%s|%s|%s|%s|%s|%s|%d", + out.Script, out.Version, out.Event, out.OsType, out.Arch, out.IngestDay, out.ExitCode, + ) + sum := sha256.Sum256([]byte(key)) + return hex.EncodeToString(sum[:]) +} + +// -------- HTTP server -------- + +func main() { + cfg := Config{ + ListenAddr: env("LISTEN_ADDR", ":8080"), + TrustedProxiesCIDR: splitCSV(env("TRUSTED_PROXIES_CIDR", "")), + + PBBaseURL: mustEnv("PB_URL"), + PBAuthCollection: env("PB_AUTH_COLLECTION", "_dev_telemetry_service"), + PBIdentity: mustEnv("PB_IDENTITY"), + PBPassword: mustEnv("PB_PASSWORD"), + PBTargetColl: env("PB_TARGET_COLLECTION", "_dev_telemetry_data"), + + MaxBodyBytes: envInt64("MAX_BODY_BYTES", 1024), + RateLimitRPM: envInt("RATE_LIMIT_RPM", 60), + RateBurst: envInt("RATE_BURST", 20), + RateKeyMode: env("RATE_KEY_MODE", "ip"), // "ip" or "header" + RateKeyHeader: env("RATE_KEY_HEADER", "X-Telemetry-Key"), + RequestTimeout: time.Duration(envInt("UPSTREAM_TIMEOUT_MS", 4000)) * time.Millisecond, + EnableReqLogging: envBool("ENABLE_REQUEST_LOGGING", false), + } + + var pt *ProxyTrust + if strings.TrimSpace(env("TRUSTED_PROXIES_CIDR", "")) != "" { + p, err := NewProxyTrust(cfg.TrustedProxiesCIDR) + if err != nil { + log.Fatalf("invalid TRUSTED_PROXIES_CIDR: %v", err) + } + pt = p + } + + pb := NewPBClient(cfg) + rl := NewRateLimiter(cfg.RateLimitRPM, cfg.RateBurst) + + mux := http.NewServeMux() + + mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + _, _ = w.Write([]byte("ok")) + }) + + mux.HandleFunc("/telemetry", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + // rate key: IP or header (header allows non-identifying keys, but header can be abused too) + var key string + switch cfg.RateKeyMode { + case "header": + key = strings.TrimSpace(r.Header.Get(cfg.RateKeyHeader)) + if key == "" { + key = "missing" + } + default: + ip := getClientIP(r, pt) + if ip == nil { + key = "unknown" + } else { + // GDPR: do NOT store IP anywhere permanent; use it only in-memory for RL key + key = ip.String() + } + } + if !rl.Allow(key) { + http.Error(w, "rate limited", http.StatusTooManyRequests) + return + } + + r.Body = http.MaxBytesReader(w, r.Body, cfg.MaxBodyBytes) + raw, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "invalid body", http.StatusBadRequest) + return + } + + // strict JSON decode (no unknown fields) + var in TelemetryIn + dec := json.NewDecoder(bytes.NewReader(raw)) + dec.DisallowUnknownFields() + if err := dec.Decode(&in); err != nil { + http.Error(w, "invalid json", http.StatusBadRequest) + return + } + if err := validate(&in); err != nil { + http.Error(w, "invalid payload", http.StatusBadRequest) + return + } + + now := time.Now().UTC() + out := TelemetryOut{ + Script: in.Script, + Version: in.Version, + Event: in.Event, + OsType: in.OsType, + OsVersion: in.OsVersion, + PveVer: in.PveVer, + Arch: in.Arch, + Method: in.Method, + Status: in.Status, + ExitCode: in.ExitCode, + Error: in.Error, + + TS: now.Unix(), + IngestDay: now.Format("2006-01-02"), + } + out.Hash = computeHash(out) + + ctx, cancel := context.WithTimeout(r.Context(), cfg.RequestTimeout) + defer cancel() + + if err := pb.CreateTelemetry(ctx, out); err != nil { + // GDPR: don't log raw payload, don't log IPs; log only generic error + log.Printf("pocketbase write failed: %v", err) + http.Error(w, "upstream error", http.StatusBadGateway) + return + } + + if cfg.EnableReqLogging { + log.Printf("telemetry accepted script=%s event=%s", out.Script, out.Event) + } + + w.WriteHeader(http.StatusAccepted) + _, _ = w.Write([]byte("accepted")) + }) + + srv := &http.Server{ + Addr: cfg.ListenAddr, + Handler: securityHeaders(mux), + ReadHeaderTimeout: 3 * time.Second, + } + + log.Printf("telemetry-ingest listening on %s", cfg.ListenAddr) + log.Fatal(srv.ListenAndServe()) +} + +func securityHeaders(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Minimal security headers (no cookies anyway) + w.Header().Set("X-Content-Type-Options", "nosniff") + w.Header().Set("X-Frame-Options", "DENY") + w.Header().Set("Referrer-Policy", "no-referrer") + next.ServeHTTP(w, r) + }) +} + +func env(k, def string) string { + v := os.Getenv(k) + if v == "" { + return def + } + return v +} +func mustEnv(k string) string { + v := os.Getenv(k) + if v == "" { + log.Fatalf("missing env %s", k) + } + return v +} +func envInt(k string, def int) int { + v := os.Getenv(k) + if v == "" { + return def + } + var i int + _, _ = fmt.Sscanf(v, "%d", &i) + if i == 0 && v != "0" { + return def + } + return i +} +func envInt64(k string, def int64) int64 { + v := os.Getenv(k) + if v == "" { + return def + } + var i int64 + _, _ = fmt.Sscanf(v, "%d", &i) + if i == 0 && v != "0" { + return def + } + return i +} +func envBool(k string, def bool) bool { + v := strings.ToLower(strings.TrimSpace(os.Getenv(k))) + if v == "" { + return def + } + return v == "1" || v == "true" || v == "yes" || v == "on" +} +func splitCSV(s string) []string { + s = strings.TrimSpace(s) + if s == "" { + return nil + } + parts := strings.Split(s, ",") + var out []string + for _, p := range parts { + p = strings.TrimSpace(p) + if p != "" { + out = append(out, p) + } + } + return out +}