package main import ( "bytes" "context" "crypto/sha256" "encoding/hex" "encoding/json" "errors" "fmt" "io" "log" "net" "net/http" "os" "strings" "sync" "time" ) type Config struct { ListenAddr string TrustedProxiesCIDR []string // PocketBase PBBaseURL string PBAuthCollection string // "_dev_telemetry_service" PBIdentity string // email PBPassword string PBTargetColl string // "_dev_telemetry_data" // Limits MaxBodyBytes int64 RateLimitRPM int // requests per minute per key RateBurst int // burst tokens RateKeyMode string // "ip" or "header" RateKeyHeader string // e.g. "X-Telemetry-Key" RequestTimeout time.Duration // upstream timeout EnableReqLogging bool // default false (GDPR-friendly) } // TelemetryIn matches payload from api.func (bash client) type TelemetryIn struct { // Required RandomID string `json:"random_id"` // Session UUID Type string `json:"type"` // "lxc" or "vm" NSAPP string `json:"nsapp"` // Application name (e.g., "jellyfin") Status string `json:"status"` // "installing", "sucess", "failed", "unknown" // Container/VM specs CTType int `json:"ct_type,omitempty"` // 1=unprivileged, 2=privileged/VM DiskSize int `json:"disk_size,omitempty"` // GB CoreCount int `json:"core_count,omitempty"` // CPU cores RAMSize int `json:"ram_size,omitempty"` // MB // System info OsType string `json:"os_type,omitempty"` // "debian", "ubuntu", "alpine", etc. OsVersion string `json:"os_version,omitempty"` // "12", "24.04", etc. PveVer string `json:"pve_version,omitempty"` // Optional Method string `json:"method,omitempty"` // "default", "advanced" Error string `json:"error,omitempty"` // Error description (max 120 chars) ExitCode int `json:"exit_code,omitempty"` // 0-255 } // TelemetryOut is sent to PocketBase (matches _dev_telemetry_data collection) type TelemetryOut struct { RandomID string `json:"random_id"` Type string `json:"type"` NSAPP string `json:"nsapp"` Status string `json:"status"` CTType int `json:"ct_type,omitempty"` DiskSize int `json:"disk_size,omitempty"` CoreCount int `json:"core_count,omitempty"` RAMSize int `json:"ram_size,omitempty"` OsType string `json:"os_type,omitempty"` OsVersion string `json:"os_version,omitempty"` PveVer string `json:"pve_version,omitempty"` Method string `json:"method,omitempty"` Error string `json:"error,omitempty"` ExitCode int `json:"exit_code,omitempty"` } type PBClient struct { baseURL string authCollection string identity string password string targetColl string mu sync.Mutex token string exp time.Time http *http.Client } func NewPBClient(cfg Config) *PBClient { return &PBClient{ baseURL: strings.TrimRight(cfg.PBBaseURL, "/"), authCollection: cfg.PBAuthCollection, identity: cfg.PBIdentity, password: cfg.PBPassword, targetColl: cfg.PBTargetColl, http: &http.Client{ Timeout: cfg.RequestTimeout, }, } } func (p *PBClient) ensureAuth(ctx context.Context) error { p.mu.Lock() defer p.mu.Unlock() // refresh if token missing or expiring soon if p.token != "" && time.Until(p.exp) > 60*time.Second { return nil } body := map[string]string{ "identity": p.identity, "password": p.password, } b, _ := json.Marshal(body) req, err := http.NewRequestWithContext(ctx, http.MethodPost, fmt.Sprintf("%s/api/collections/%s/auth-with-password", p.baseURL, p.authCollection), bytes.NewReader(b), ) if err != nil { return err } req.Header.Set("Content-Type", "application/json") resp, err := p.http.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { rb, _ := io.ReadAll(io.LimitReader(resp.Body, 4<<10)) return fmt.Errorf("pocketbase auth failed: %s: %s", resp.Status, strings.TrimSpace(string(rb))) } var out struct { Token string `json:"token"` // record omitted } if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { return err } if out.Token == "" { return errors.New("pocketbase auth token missing") } // PocketBase JWT exp can be parsed, but keep it simple: set 50 min p.token = out.Token p.exp = time.Now().Add(50 * time.Minute) return nil } func (p *PBClient) CreateTelemetry(ctx context.Context, payload TelemetryOut) error { if err := p.ensureAuth(ctx); err != nil { return err } b, _ := json.Marshal(payload) req, err := http.NewRequestWithContext(ctx, http.MethodPost, fmt.Sprintf("%s/api/collections/%s/records", p.baseURL, p.targetColl), bytes.NewReader(b), ) if err != nil { return err } req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", "Bearer "+p.token) resp, err := p.http.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { rb, _ := io.ReadAll(io.LimitReader(resp.Body, 8<<10)) return fmt.Errorf("pocketbase create failed: %s: %s", resp.Status, strings.TrimSpace(string(rb))) } return nil } // -------- Rate limiter (token bucket / minute window, simple) -------- type bucket struct { tokens int reset time.Time } type RateLimiter struct { mu sync.Mutex buckets map[string]*bucket rpm int burst int window time.Duration cleanInt time.Duration } func NewRateLimiter(rpm, burst int) *RateLimiter { rl := &RateLimiter{ buckets: make(map[string]*bucket), rpm: rpm, burst: burst, window: time.Minute, cleanInt: 5 * time.Minute, } go rl.cleanupLoop() return rl } func (r *RateLimiter) cleanupLoop() { t := time.NewTicker(r.cleanInt) defer t.Stop() for range t.C { now := time.Now() r.mu.Lock() for k, b := range r.buckets { if now.After(b.reset.Add(2 * r.window)) { delete(r.buckets, k) } } r.mu.Unlock() } } func (r *RateLimiter) Allow(key string) bool { if r.rpm <= 0 { return true } now := time.Now() r.mu.Lock() defer r.mu.Unlock() b, ok := r.buckets[key] if !ok || now.After(b.reset) { r.buckets[key] = &bucket{tokens: min(r.burst, r.rpm), reset: now.Add(r.window)} b = r.buckets[key] } if b.tokens <= 0 { return false } b.tokens-- return true } func min(a, b int) int { if a < b { return a } return b } // -------- Utility: GDPR-safe key extraction -------- type ProxyTrust struct { nets []*net.IPNet } func NewProxyTrust(cidrs []string) (*ProxyTrust, error) { var nets []*net.IPNet for _, c := range cidrs { _, n, err := net.ParseCIDR(strings.TrimSpace(c)) if err != nil { return nil, err } nets = append(nets, n) } return &ProxyTrust{nets: nets}, nil } func (pt *ProxyTrust) isTrusted(ip net.IP) bool { for _, n := range pt.nets { if n.Contains(ip) { return true } } return false } func getClientIP(r *http.Request, pt *ProxyTrust) net.IP { // If behind reverse proxy, trust X-Forwarded-For only if remote is trusted proxy. host, _, _ := net.SplitHostPort(r.RemoteAddr) remote := net.ParseIP(host) if remote == nil { return nil } if pt != nil && pt.isTrusted(remote) { xff := r.Header.Get("X-Forwarded-For") if xff != "" { parts := strings.Split(xff, ",") ip := net.ParseIP(strings.TrimSpace(parts[0])) if ip != nil { return ip } } } return remote } // -------- Validation (strict allowlist) -------- var ( // Allowed values for 'type' field allowedType = map[string]bool{"lxc": true, "vm": true} // Allowed values for 'status' field (note: "sucess" is intentional, matches PB schema) allowedStatus = map[string]bool{"installing": true, "sucess": true, "failed": true, "unknown": true} // Allowed values for 'os_type' field allowedOsType = map[string]bool{ "debian": true, "ubuntu": true, "alpine": true, "devuan": true, "fedora": true, "rocky": true, "alma": true, "centos": true, "opensuse": true, "gentoo": true, "openeuler": true, } ) func sanitizeShort(s string, max int) string { s = strings.TrimSpace(s) if s == "" { return "" } // remove line breaks and high-risk chars s = strings.ReplaceAll(s, "\n", " ") s = strings.ReplaceAll(s, "\r", " ") if len(s) > max { s = s[:max] } return s } func validate(in *TelemetryIn) error { // Sanitize all string fields in.RandomID = sanitizeShort(in.RandomID, 64) in.Type = sanitizeShort(in.Type, 8) in.NSAPP = sanitizeShort(in.NSAPP, 64) in.Status = sanitizeShort(in.Status, 16) in.OsType = sanitizeShort(in.OsType, 32) in.OsVersion = sanitizeShort(in.OsVersion, 32) in.PveVer = sanitizeShort(in.PveVer, 32) in.Method = sanitizeShort(in.Method, 32) // IMPORTANT: "error" must be short and not contain identifiers/logs in.Error = sanitizeShort(in.Error, 120) // Required fields if in.RandomID == "" || in.Type == "" || in.NSAPP == "" || in.Status == "" { return errors.New("missing required fields: random_id, type, nsapp, status") } // Validate enums if !allowedType[in.Type] { return errors.New("invalid type (must be 'lxc' or 'vm')") } if !allowedStatus[in.Status] { return errors.New("invalid status") } // os_type is optional but if provided must be valid if in.OsType != "" && !allowedOsType[in.OsType] { return errors.New("invalid os_type") } // method is optional and flexible - just sanitized, no strict validation // Values like "default", "advanced", "mydefaults-global", "mydefaults-app" are all valid // Validate numeric ranges if in.CTType < 0 || in.CTType > 2 { return errors.New("invalid ct_type (must be 0, 1, or 2)") } if in.DiskSize < 0 || in.DiskSize > 100000 { return errors.New("invalid disk_size") } if in.CoreCount < 0 || in.CoreCount > 256 { return errors.New("invalid core_count") } if in.RAMSize < 0 || in.RAMSize > 1048576 { return errors.New("invalid ram_size") } if in.ExitCode < 0 || in.ExitCode > 255 { return errors.New("invalid exit_code") } return nil } // computeHash generates a hash for deduplication (GDPR-safe, no IP) func computeHash(out TelemetryOut) string { key := fmt.Sprintf("%s|%s|%s|%s|%d", out.RandomID, out.NSAPP, out.Type, out.Status, out.ExitCode, ) sum := sha256.Sum256([]byte(key)) return hex.EncodeToString(sum[:]) } // -------- HTTP server -------- func main() { cfg := Config{ ListenAddr: env("LISTEN_ADDR", ":8080"), TrustedProxiesCIDR: splitCSV(env("TRUSTED_PROXIES_CIDR", "")), PBBaseURL: mustEnv("PB_URL"), PBAuthCollection: env("PB_AUTH_COLLECTION", "_dev_telemetry_service"), PBIdentity: mustEnv("PB_IDENTITY"), PBPassword: mustEnv("PB_PASSWORD"), PBTargetColl: env("PB_TARGET_COLLECTION", "_dev_telemetry_data"), MaxBodyBytes: envInt64("MAX_BODY_BYTES", 1024), RateLimitRPM: envInt("RATE_LIMIT_RPM", 60), RateBurst: envInt("RATE_BURST", 20), RateKeyMode: env("RATE_KEY_MODE", "ip"), // "ip" or "header" RateKeyHeader: env("RATE_KEY_HEADER", "X-Telemetry-Key"), RequestTimeout: time.Duration(envInt("UPSTREAM_TIMEOUT_MS", 4000)) * time.Millisecond, EnableReqLogging: envBool("ENABLE_REQUEST_LOGGING", false), } var pt *ProxyTrust if strings.TrimSpace(env("TRUSTED_PROXIES_CIDR", "")) != "" { p, err := NewProxyTrust(cfg.TrustedProxiesCIDR) if err != nil { log.Fatalf("invalid TRUSTED_PROXIES_CIDR: %v", err) } pt = p } pb := NewPBClient(cfg) rl := NewRateLimiter(cfg.RateLimitRPM, cfg.RateBurst) mux := http.NewServeMux() mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) _, _ = w.Write([]byte("ok")) }) mux.HandleFunc("/telemetry", func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } // rate key: IP or header (header allows non-identifying keys, but header can be abused too) var key string switch cfg.RateKeyMode { case "header": key = strings.TrimSpace(r.Header.Get(cfg.RateKeyHeader)) if key == "" { key = "missing" } default: ip := getClientIP(r, pt) if ip == nil { key = "unknown" } else { // GDPR: do NOT store IP anywhere permanent; use it only in-memory for RL key key = ip.String() } } if !rl.Allow(key) { http.Error(w, "rate limited", http.StatusTooManyRequests) return } r.Body = http.MaxBytesReader(w, r.Body, cfg.MaxBodyBytes) raw, err := io.ReadAll(r.Body) if err != nil { http.Error(w, "invalid body", http.StatusBadRequest) return } // strict JSON decode (no unknown fields) var in TelemetryIn dec := json.NewDecoder(bytes.NewReader(raw)) dec.DisallowUnknownFields() if err := dec.Decode(&in); err != nil { http.Error(w, "invalid json", http.StatusBadRequest) return } if err := validate(&in); err != nil { http.Error(w, "invalid payload", http.StatusBadRequest) return } // Map input to PocketBase schema out := TelemetryOut{ RandomID: in.RandomID, Type: in.Type, NSAPP: in.NSAPP, Status: in.Status, CTType: in.CTType, DiskSize: in.DiskSize, CoreCount: in.CoreCount, RAMSize: in.RAMSize, OsType: in.OsType, OsVersion: in.OsVersion, PveVer: in.PveVer, Method: in.Method, Error: in.Error, ExitCode: in.ExitCode, } _ = computeHash(out) // For future deduplication ctx, cancel := context.WithTimeout(r.Context(), cfg.RequestTimeout) defer cancel() if err := pb.CreateTelemetry(ctx, out); err != nil { // GDPR: don't log raw payload, don't log IPs; log only generic error log.Printf("pocketbase write failed: %v", err) http.Error(w, "upstream error", http.StatusBadGateway) return } if cfg.EnableReqLogging { log.Printf("telemetry accepted nsapp=%s status=%s", out.NSAPP, out.Status) } w.WriteHeader(http.StatusAccepted) _, _ = w.Write([]byte("accepted")) }) srv := &http.Server{ Addr: cfg.ListenAddr, Handler: securityHeaders(mux), ReadHeaderTimeout: 3 * time.Second, } log.Printf("telemetry-ingest listening on %s", cfg.ListenAddr) log.Fatal(srv.ListenAndServe()) } func securityHeaders(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Minimal security headers (no cookies anyway) w.Header().Set("X-Content-Type-Options", "nosniff") w.Header().Set("X-Frame-Options", "DENY") w.Header().Set("Referrer-Policy", "no-referrer") next.ServeHTTP(w, r) }) } func env(k, def string) string { v := os.Getenv(k) if v == "" { return def } return v } func mustEnv(k string) string { v := os.Getenv(k) if v == "" { log.Fatalf("missing env %s", k) } return v } func envInt(k string, def int) int { v := os.Getenv(k) if v == "" { return def } var i int _, _ = fmt.Sscanf(v, "%d", &i) if i == 0 && v != "0" { return def } return i } func envInt64(k string, def int64) int64 { v := os.Getenv(k) if v == "" { return def } var i int64 _, _ = fmt.Sscanf(v, "%d", &i) if i == 0 && v != "0" { return def } return i } func envBool(k string, def bool) bool { v := strings.ToLower(strings.TrimSpace(os.Getenv(k))) if v == "" { return def } return v == "1" || v == "true" || v == "yes" || v == "on" } func splitCSV(s string) []string { s = strings.TrimSpace(s) if s == "" { return nil } parts := strings.Split(s, ",") var out []string for _, p := range parts { p = strings.TrimSpace(p) if p != "" { out = append(out, p) } } return out }