ProxmoxVED/misc/data/service.go
CanbiZ (MickLesk) d32b00ff31 Add weekly reports, cleanup, and dashboard UI
Introduce weekly summary reports and a cleanup job, enhance dashboard UI, and adjust telemetry/build settings.

- Add REPO_SOURCE to misc/api.func and include repo_source in telemetry payloads.
- Implement weekly report generation/scheduling in alerts.go: new data types, HTML/plain templates, scheduler, SendWeeklyReport/TestWeeklyReport, and email/HTML helpers.
- Add Cleaner (misc/data/cleanup.go) to detect and mark stuck installations as 'unknown' with scheduling and manual trigger APIs.
- Enhance dashboard backend/frontend (misc/data/dashboard.go): optional days filter (allow 'All'), increase fetch page size, simplify fetchRecords, add quick filter buttons, detail & health modals, improved styles and chart options, and client-side record detail view.
- Update Dockerfile (misc/data/Dockerfile): rename binaries to telemetry-service and build migrate from ./migration/migrate.go; copy adjusted in final image.
- Add migration tooling (misc/data/migration/migrate.sh and migration.go) and other small service changes.

These changes add operational reporting and cleanup capabilities, improve observability and UX of the dashboard, and align build and telemetry identifiers for the service.
2026-02-11 12:19:30 +01:00

1197 lines
35 KiB
Go

package main
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net"
"net/http"
"os"
"strings"
"sync"
"time"
)
type Config struct {
ListenAddr string
TrustedProxiesCIDR []string
// PocketBase
PBBaseURL string
PBAuthCollection string // "_dev_telemetry_service"
PBIdentity string // email
PBPassword string
PBTargetColl string // "_dev_telemetry_data" (dev default)
PBLiveTargetColl string // "_live_telemetry_data" (production)
// Limits
MaxBodyBytes int64
RateLimitRPM int // requests per minute per key
RateBurst int // burst tokens
RateKeyMode string // "ip" or "header"
RateKeyHeader string // e.g. "X-Telemetry-Key"
RequestTimeout time.Duration // upstream timeout
EnableReqLogging bool // default false (GDPR-friendly)
// Cache
RedisURL string
EnableRedis bool
CacheTTL time.Duration
CacheEnabled bool
// Alerts (SMTP)
AlertEnabled bool
SMTPHost string
SMTPPort int
SMTPUser string
SMTPPassword string
SMTPFrom string
SMTPTo []string
SMTPUseTLS bool
AlertFailureThreshold float64
AlertCheckInterval time.Duration
AlertCooldown time.Duration
}
// TelemetryIn matches payload from api.func (bash client)
type TelemetryIn struct {
// Required
RandomID string `json:"random_id"` // Session UUID
Type string `json:"type"` // "lxc", "vm", "tool", "addon"
NSAPP string `json:"nsapp"` // Application name (e.g., "jellyfin")
Status string `json:"status"` // "installing", "success", "failed", "unknown"
// Container/VM specs
CTType int `json:"ct_type,omitempty"` // 1=unprivileged, 2=privileged/VM
DiskSize int `json:"disk_size,omitempty"` // GB
CoreCount int `json:"core_count,omitempty"` // CPU cores
RAMSize int `json:"ram_size,omitempty"` // MB
// System info
OsType string `json:"os_type,omitempty"` // "debian", "ubuntu", "alpine", etc.
OsVersion string `json:"os_version,omitempty"` // "12", "24.04", etc.
PveVer string `json:"pve_version,omitempty"`
// Optional
Method string `json:"method,omitempty"` // "default", "advanced"
Error string `json:"error,omitempty"` // Error description (max 120 chars)
ExitCode int `json:"exit_code,omitempty"` // 0-255
// === EXTENDED FIELDS ===
// GPU Passthrough stats
GPUVendor string `json:"gpu_vendor,omitempty"` // "intel", "amd", "nvidia"
GPUModel string `json:"gpu_model,omitempty"` // e.g., "Intel Arc Graphics"
GPUPassthrough string `json:"gpu_passthrough,omitempty"` // "igpu", "dgpu", "vgpu", "none"
// CPU stats
CPUVendor string `json:"cpu_vendor,omitempty"` // "intel", "amd", "arm"
CPUModel string `json:"cpu_model,omitempty"` // e.g., "Intel Core Ultra 7 155H"
// RAM stats
RAMSpeed string `json:"ram_speed,omitempty"` // e.g., "4800" (MT/s)
// Performance metrics
InstallDuration int `json:"install_duration,omitempty"` // Seconds
// Error categorization
ErrorCategory string `json:"error_category,omitempty"` // "network", "storage", "dependency", "permission", "timeout", "unknown"
// Repository source for collection routing
RepoSource string `json:"repo_source,omitempty"` // "community-scripts/ProxmoxVE" or "community-scripts/ProxmoxVED"
}
// TelemetryOut is sent to PocketBase (matches _dev_telemetry_data collection)
type TelemetryOut struct {
RandomID string `json:"random_id"`
Type string `json:"type"`
NSAPP string `json:"nsapp"`
Status string `json:"status"`
CTType int `json:"ct_type,omitempty"`
DiskSize int `json:"disk_size,omitempty"`
CoreCount int `json:"core_count,omitempty"`
RAMSize int `json:"ram_size,omitempty"`
OsType string `json:"os_type,omitempty"`
OsVersion string `json:"os_version,omitempty"`
PveVer string `json:"pve_version,omitempty"`
Method string `json:"method,omitempty"`
Error string `json:"error,omitempty"`
ExitCode int `json:"exit_code,omitempty"`
// Extended fields
GPUVendor string `json:"gpu_vendor,omitempty"`
GPUModel string `json:"gpu_model,omitempty"`
GPUPassthrough string `json:"gpu_passthrough,omitempty"`
CPUVendor string `json:"cpu_vendor,omitempty"`
CPUModel string `json:"cpu_model,omitempty"`
RAMSpeed string `json:"ram_speed,omitempty"`
InstallDuration int `json:"install_duration,omitempty"`
ErrorCategory string `json:"error_category,omitempty"`
}
// TelemetryStatusUpdate contains only fields needed for status updates
type TelemetryStatusUpdate struct {
Status string `json:"status"`
Error string `json:"error,omitempty"`
ExitCode int `json:"exit_code"`
InstallDuration int `json:"install_duration,omitempty"`
ErrorCategory string `json:"error_category,omitempty"`
GPUVendor string `json:"gpu_vendor,omitempty"`
GPUModel string `json:"gpu_model,omitempty"`
GPUPassthrough string `json:"gpu_passthrough,omitempty"`
CPUVendor string `json:"cpu_vendor,omitempty"`
CPUModel string `json:"cpu_model,omitempty"`
RAMSpeed string `json:"ram_speed,omitempty"`
}
// Allowed values for 'repo_source' field — controls collection routing
var allowedRepoSource = map[string]bool{
"community-scripts/ProxmoxVE": true,
"community-scripts/ProxmoxVED": true,
}
type PBClient struct {
baseURL string
authCollection string
identity string
password string
devColl string // "_dev_telemetry_data"
liveColl string // "_live_telemetry_data"
mu sync.Mutex
token string
exp time.Time
http *http.Client
}
func NewPBClient(cfg Config) *PBClient {
return &PBClient{
baseURL: strings.TrimRight(cfg.PBBaseURL, "/"),
authCollection: cfg.PBAuthCollection,
identity: cfg.PBIdentity,
password: cfg.PBPassword,
devColl: cfg.PBTargetColl,
liveColl: cfg.PBLiveTargetColl,
http: &http.Client{
Timeout: cfg.RequestTimeout,
},
}
}
// resolveCollection maps a repo_source value to the correct PocketBase collection.
// - "community-scripts/ProxmoxVE" → live collection
// - "community-scripts/ProxmoxVED" → dev collection
// - empty / unknown → dev collection (safe default)
func (p *PBClient) resolveCollection(repoSource string) string {
if repoSource == "community-scripts/ProxmoxVE" && p.liveColl != "" {
return p.liveColl
}
return p.devColl
}
func (p *PBClient) ensureAuth(ctx context.Context) error {
p.mu.Lock()
defer p.mu.Unlock()
// refresh if token missing or expiring soon
if p.token != "" && time.Until(p.exp) > 60*time.Second {
return nil
}
body := map[string]string{
"identity": p.identity,
"password": p.password,
}
b, _ := json.Marshal(body)
req, err := http.NewRequestWithContext(ctx, http.MethodPost,
fmt.Sprintf("%s/api/collections/%s/auth-with-password", p.baseURL, p.authCollection),
bytes.NewReader(b),
)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := p.http.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
rb, _ := io.ReadAll(io.LimitReader(resp.Body, 4<<10))
return fmt.Errorf("pocketbase auth failed: %s: %s", resp.Status, strings.TrimSpace(string(rb)))
}
var out struct {
Token string `json:"token"`
// record omitted
}
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return err
}
if out.Token == "" {
return errors.New("pocketbase auth token missing")
}
// PocketBase JWT exp can be parsed, but keep it simple: set 50 min
p.token = out.Token
p.exp = time.Now().Add(50 * time.Minute)
return nil
}
// FindRecordByRandomID searches for an existing record by random_id in the given collection
func (p *PBClient) FindRecordByRandomID(ctx context.Context, coll, randomID string) (string, error) {
if err := p.ensureAuth(ctx); err != nil {
return "", err
}
// URL encode the filter
filter := fmt.Sprintf("random_id='%s'", randomID)
req, err := http.NewRequestWithContext(ctx, http.MethodGet,
fmt.Sprintf("%s/api/collections/%s/records?filter=%s&fields=id&perPage=1",
p.baseURL, coll, filter),
nil,
)
if err != nil {
return "", err
}
req.Header.Set("Authorization", "Bearer "+p.token)
resp, err := p.http.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return "", fmt.Errorf("pocketbase search failed: %s", resp.Status)
}
var result struct {
Items []struct {
ID string `json:"id"`
} `json:"items"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return "", err
}
if len(result.Items) == 0 {
return "", nil // Not found
}
return result.Items[0].ID, nil
}
// UpdateTelemetryStatus updates only status, error, and exit_code of an existing record
func (p *PBClient) UpdateTelemetryStatus(ctx context.Context, coll, recordID string, update TelemetryStatusUpdate) error {
if err := p.ensureAuth(ctx); err != nil {
return err
}
b, _ := json.Marshal(update)
req, err := http.NewRequestWithContext(ctx, http.MethodPatch,
fmt.Sprintf("%s/api/collections/%s/records/%s", p.baseURL, coll, recordID),
bytes.NewReader(b),
)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+p.token)
resp, err := p.http.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
rb, _ := io.ReadAll(io.LimitReader(resp.Body, 8<<10))
return fmt.Errorf("pocketbase update failed: %s: %s", resp.Status, strings.TrimSpace(string(rb)))
}
return nil
}
// FetchRecordsPaginated retrieves records with pagination and optional filters.
// Uses devColl by default (dashboard shows dev data); for live data, use separate endpoint if needed.
func (p *PBClient) FetchRecordsPaginated(ctx context.Context, page, limit int, status, app, osType, sortField string) ([]TelemetryRecord, int, error) {
if err := p.ensureAuth(ctx); err != nil {
return nil, 0, err
}
// Build filter
var filters []string
if status != "" {
filters = append(filters, fmt.Sprintf("status='%s'", status))
}
if app != "" {
filters = append(filters, fmt.Sprintf("nsapp~'%s'", app))
}
if osType != "" {
filters = append(filters, fmt.Sprintf("os_type='%s'", osType))
}
filterStr := ""
if len(filters) > 0 {
filterStr = "&filter=" + strings.Join(filters, "&&")
}
// Handle sort parameter (default: -created)
sort := "-created"
if sortField != "" {
// Validate sort field to prevent injection
allowedFields := map[string]bool{
"created": true, "-created": true,
"nsapp": true, "-nsapp": true,
"status": true, "-status": true,
"os_type": true, "-os_type": true,
"type": true, "-type": true,
"method": true, "-method": true,
"exit_code": true, "-exit_code": true,
}
if allowedFields[sortField] {
sort = sortField
}
}
reqURL := fmt.Sprintf("%s/api/collections/%s/records?sort=%s&page=%d&perPage=%d%s",
p.baseURL, p.devColl, sort, page, limit, filterStr)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil)
if err != nil {
return nil, 0, err
}
req.Header.Set("Authorization", "Bearer "+p.token)
resp, err := p.http.Do(req)
if err != nil {
return nil, 0, err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, 0, fmt.Errorf("pocketbase fetch failed: %s", resp.Status)
}
var result struct {
Items []TelemetryRecord `json:"items"`
TotalItems int `json:"totalItems"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, 0, err
}
return result.Items, result.TotalItems, nil
}
// UpsertTelemetry handles both creation and updates intelligently.
// Routes to the correct PocketBase collection based on repoSource:
// - "community-scripts/ProxmoxVE" → _live_telemetry_data
// - "community-scripts/ProxmoxVED" → _dev_telemetry_data
//
// For status="installing": always creates a new record.
// For status!="installing": updates existing record (found by random_id).
func (p *PBClient) UpsertTelemetry(ctx context.Context, payload TelemetryOut, repoSource string) error {
coll := p.resolveCollection(repoSource)
// For "installing" status, always create new record
if payload.Status == "installing" {
return p.CreateTelemetry(ctx, coll, payload)
}
// For status updates (success/failed/unknown), find and update existing record
recordID, err := p.FindRecordByRandomID(ctx, coll, payload.RandomID)
if err != nil {
// Search failed, log and return error
return fmt.Errorf("cannot find record to update: %w", err)
}
if recordID == "" {
// Record not found - this shouldn't happen normally
// Create a full record as fallback
return p.CreateTelemetry(ctx, coll, payload)
}
// Update only status, error, exit_code, and new metrics fields
update := TelemetryStatusUpdate{
Status: payload.Status,
Error: payload.Error,
ExitCode: payload.ExitCode,
InstallDuration: payload.InstallDuration,
ErrorCategory: payload.ErrorCategory,
GPUVendor: payload.GPUVendor,
GPUModel: payload.GPUModel,
GPUPassthrough: payload.GPUPassthrough,
CPUVendor: payload.CPUVendor,
CPUModel: payload.CPUModel,
RAMSpeed: payload.RAMSpeed,
}
return p.UpdateTelemetryStatus(ctx, coll, recordID, update)
}
func (p *PBClient) CreateTelemetry(ctx context.Context, coll string, payload TelemetryOut) error {
if err := p.ensureAuth(ctx); err != nil {
return err
}
b, _ := json.Marshal(payload)
req, err := http.NewRequestWithContext(ctx, http.MethodPost,
fmt.Sprintf("%s/api/collections/%s/records", p.baseURL, coll),
bytes.NewReader(b),
)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+p.token)
resp, err := p.http.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
rb, _ := io.ReadAll(io.LimitReader(resp.Body, 8<<10))
return fmt.Errorf("pocketbase create failed: %s: %s", resp.Status, strings.TrimSpace(string(rb)))
}
return nil
}
// -------- Rate limiter (token bucket / minute window, simple) --------
type bucket struct {
tokens int
reset time.Time
}
type RateLimiter struct {
mu sync.Mutex
buckets map[string]*bucket
rpm int
burst int
window time.Duration
cleanInt time.Duration
}
func NewRateLimiter(rpm, burst int) *RateLimiter {
rl := &RateLimiter{
buckets: make(map[string]*bucket),
rpm: rpm,
burst: burst,
window: time.Minute,
cleanInt: 5 * time.Minute,
}
go rl.cleanupLoop()
return rl
}
func (r *RateLimiter) cleanupLoop() {
t := time.NewTicker(r.cleanInt)
defer t.Stop()
for range t.C {
now := time.Now()
r.mu.Lock()
for k, b := range r.buckets {
if now.After(b.reset.Add(2 * r.window)) {
delete(r.buckets, k)
}
}
r.mu.Unlock()
}
}
func (r *RateLimiter) Allow(key string) bool {
if r.rpm <= 0 {
return true
}
now := time.Now()
r.mu.Lock()
defer r.mu.Unlock()
b, ok := r.buckets[key]
if !ok || now.After(b.reset) {
r.buckets[key] = &bucket{tokens: min(r.burst, r.rpm), reset: now.Add(r.window)}
b = r.buckets[key]
}
if b.tokens <= 0 {
return false
}
b.tokens--
return true
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
// -------- Utility: GDPR-safe key extraction --------
type ProxyTrust struct {
nets []*net.IPNet
}
func NewProxyTrust(cidrs []string) (*ProxyTrust, error) {
var nets []*net.IPNet
for _, c := range cidrs {
_, n, err := net.ParseCIDR(strings.TrimSpace(c))
if err != nil {
return nil, err
}
nets = append(nets, n)
}
return &ProxyTrust{nets: nets}, nil
}
func (pt *ProxyTrust) isTrusted(ip net.IP) bool {
for _, n := range pt.nets {
if n.Contains(ip) {
return true
}
}
return false
}
func getClientIP(r *http.Request, pt *ProxyTrust) net.IP {
// If behind reverse proxy, trust X-Forwarded-For only if remote is trusted proxy.
host, _, _ := net.SplitHostPort(r.RemoteAddr)
remote := net.ParseIP(host)
if remote == nil {
return nil
}
if pt != nil && pt.isTrusted(remote) {
xff := r.Header.Get("X-Forwarded-For")
if xff != "" {
parts := strings.Split(xff, ",")
ip := net.ParseIP(strings.TrimSpace(parts[0]))
if ip != nil {
return ip
}
}
}
return remote
}
// -------- Validation (strict allowlist) --------
var (
// Allowed values for 'type' field
allowedType = map[string]bool{"lxc": true, "vm": true, "tool": true, "addon": true}
// Allowed values for 'status' field
allowedStatus = map[string]bool{"installing": true, "success": true, "failed": true, "unknown": true}
// Allowed values for 'os_type' field
allowedOsType = map[string]bool{
"debian": true, "ubuntu": true, "alpine": true, "devuan": true,
"fedora": true, "rocky": true, "alma": true, "centos": true,
"opensuse": true, "gentoo": true, "openeuler": true,
}
// Allowed values for 'gpu_vendor' field
allowedGPUVendor = map[string]bool{"intel": true, "amd": true, "nvidia": true, "unknown": true, "": true}
// Allowed values for 'gpu_passthrough' field
allowedGPUPassthrough = map[string]bool{"igpu": true, "dgpu": true, "vgpu": true, "none": true, "unknown": true, "": true}
// Allowed values for 'cpu_vendor' field
allowedCPUVendor = map[string]bool{"intel": true, "amd": true, "arm": true, "apple": true, "qualcomm": true, "unknown": true, "": true}
// Allowed values for 'error_category' field
allowedErrorCategory = map[string]bool{
"network": true, "storage": true, "dependency": true, "permission": true,
"timeout": true, "config": true, "resource": true, "unknown": true, "": true,
}
)
func sanitizeShort(s string, max int) string {
s = strings.TrimSpace(s)
if s == "" {
return ""
}
// remove line breaks and high-risk chars
s = strings.ReplaceAll(s, "\n", " ")
s = strings.ReplaceAll(s, "\r", " ")
if len(s) > max {
s = s[:max]
}
return s
}
func validate(in *TelemetryIn) error {
// Sanitize all string fields
in.RandomID = sanitizeShort(in.RandomID, 64)
in.Type = sanitizeShort(in.Type, 8)
in.NSAPP = sanitizeShort(in.NSAPP, 64)
in.Status = sanitizeShort(in.Status, 16)
in.OsType = sanitizeShort(in.OsType, 32)
in.OsVersion = sanitizeShort(in.OsVersion, 32)
in.PveVer = sanitizeShort(in.PveVer, 32)
in.Method = sanitizeShort(in.Method, 32)
// Sanitize extended fields
in.GPUVendor = strings.ToLower(sanitizeShort(in.GPUVendor, 16))
in.GPUModel = sanitizeShort(in.GPUModel, 64)
in.GPUPassthrough = strings.ToLower(sanitizeShort(in.GPUPassthrough, 16))
in.CPUVendor = strings.ToLower(sanitizeShort(in.CPUVendor, 16))
in.CPUModel = sanitizeShort(in.CPUModel, 64)
in.RAMSpeed = sanitizeShort(in.RAMSpeed, 16)
in.ErrorCategory = strings.ToLower(sanitizeShort(in.ErrorCategory, 32))
// Sanitize repo_source (routing field)
in.RepoSource = sanitizeShort(in.RepoSource, 64)
// Default empty values to "unknown" for consistency
if in.GPUVendor == "" {
in.GPUVendor = "unknown"
}
if in.GPUPassthrough == "" {
in.GPUPassthrough = "unknown"
}
if in.CPUVendor == "" {
in.CPUVendor = "unknown"
}
// IMPORTANT: "error" must be short and not contain identifiers/logs
in.Error = sanitizeShort(in.Error, 120)
// Required fields for all requests
if in.RandomID == "" || in.Type == "" || in.NSAPP == "" || in.Status == "" {
return errors.New("missing required fields: random_id, type, nsapp, status")
}
// Normalize common typos for backwards compatibility
if in.Status == "sucess" {
in.Status = "success"
}
// Validate enums
if !allowedType[in.Type] {
return errors.New("invalid type (must be 'lxc', 'vm', 'tool', or 'addon')")
}
if !allowedStatus[in.Status] {
return errors.New("invalid status")
}
// Validate new enum fields
if !allowedGPUVendor[in.GPUVendor] {
return errors.New("invalid gpu_vendor (must be 'intel', 'amd', 'nvidia', 'unknown')")
}
if !allowedGPUPassthrough[in.GPUPassthrough] {
return errors.New("invalid gpu_passthrough (must be 'igpu', 'dgpu', 'vgpu', 'none', 'unknown')")
}
if !allowedCPUVendor[in.CPUVendor] {
return errors.New("invalid cpu_vendor (must be 'intel', 'amd', 'arm', 'apple', 'qualcomm', 'unknown')")
}
if !allowedErrorCategory[in.ErrorCategory] {
return errors.New("invalid error_category")
}
// For status updates (not installing), skip numeric field validation
// These are only required for initial creation
isUpdate := in.Status != "installing"
// os_type is optional but if provided must be valid (only for lxc/vm)
if (in.Type == "lxc" || in.Type == "vm") && in.OsType != "" && !allowedOsType[in.OsType] {
return errors.New("invalid os_type")
}
// method is optional and flexible - just sanitized, no strict validation
// Values like "default", "advanced", "mydefaults-global", "mydefaults-app" are all valid
// Validate numeric ranges (only strict for new records)
if !isUpdate && (in.Type == "lxc" || in.Type == "vm") {
if in.CTType < 0 || in.CTType > 2 {
return errors.New("invalid ct_type (must be 0, 1, or 2)")
}
}
if in.DiskSize < 0 || in.DiskSize > 100000 {
return errors.New("invalid disk_size")
}
if in.CoreCount < 0 || in.CoreCount > 256 {
return errors.New("invalid core_count")
}
if in.RAMSize < 0 || in.RAMSize > 1048576 {
return errors.New("invalid ram_size")
}
if in.ExitCode < 0 || in.ExitCode > 255 {
return errors.New("invalid exit_code")
}
if in.InstallDuration < 0 || in.InstallDuration > 86400 {
return errors.New("invalid install_duration (max 24h)")
}
// Validate repo_source: must be an allowed repository or empty
if in.RepoSource != "" && !allowedRepoSource[in.RepoSource] {
return errors.New("invalid repo_source (must be 'community-scripts/ProxmoxVE' or 'community-scripts/ProxmoxVED')")
}
return nil
}
// computeHash generates a hash for deduplication (GDPR-safe, no IP)
func computeHash(out TelemetryOut) string {
key := fmt.Sprintf("%s|%s|%s|%s|%d",
out.RandomID, out.NSAPP, out.Type, out.Status, out.ExitCode,
)
sum := sha256.Sum256([]byte(key))
return hex.EncodeToString(sum[:])
}
// -------- HTTP server --------
func main() {
cfg := Config{
ListenAddr: env("LISTEN_ADDR", ":8080"),
TrustedProxiesCIDR: splitCSV(env("TRUSTED_PROXIES_CIDR", "")),
PBBaseURL: mustEnv("PB_URL"),
PBAuthCollection: env("PB_AUTH_COLLECTION", "_dev_telemetry_service"),
PBIdentity: mustEnv("PB_IDENTITY"),
PBPassword: mustEnv("PB_PASSWORD"),
PBTargetColl: env("PB_TARGET_COLLECTION", "_dev_telemetry_data"),
PBLiveTargetColl: env("PB_LIVE_TARGET_COLLECTION", "_live_telemetry_data"),
MaxBodyBytes: envInt64("MAX_BODY_BYTES", 1024),
RateLimitRPM: envInt("RATE_LIMIT_RPM", 60),
RateBurst: envInt("RATE_BURST", 20),
RateKeyMode: env("RATE_KEY_MODE", "ip"), // "ip" or "header"
RateKeyHeader: env("RATE_KEY_HEADER", "X-Telemetry-Key"),
RequestTimeout: time.Duration(envInt("UPSTREAM_TIMEOUT_MS", 4000)) * time.Millisecond,
EnableReqLogging: envBool("ENABLE_REQUEST_LOGGING", false),
// Cache config
RedisURL: env("REDIS_URL", ""),
EnableRedis: envBool("ENABLE_REDIS", false),
CacheTTL: time.Duration(envInt("CACHE_TTL_SECONDS", 60)) * time.Second,
CacheEnabled: envBool("ENABLE_CACHE", true),
// Alert config
AlertEnabled: envBool("ALERT_ENABLED", false),
SMTPHost: env("SMTP_HOST", ""),
SMTPPort: envInt("SMTP_PORT", 587),
SMTPUser: env("SMTP_USER", ""),
SMTPPassword: env("SMTP_PASSWORD", ""),
SMTPFrom: env("SMTP_FROM", "telemetry@proxmoxved.local"),
SMTPTo: splitCSV(env("SMTP_TO", "")),
SMTPUseTLS: envBool("SMTP_USE_TLS", false),
AlertFailureThreshold: envFloat("ALERT_FAILURE_THRESHOLD", 20.0),
AlertCheckInterval: time.Duration(envInt("ALERT_CHECK_INTERVAL_MIN", 15)) * time.Minute,
AlertCooldown: time.Duration(envInt("ALERT_COOLDOWN_MIN", 60)) * time.Minute,
}
var pt *ProxyTrust
if strings.TrimSpace(env("TRUSTED_PROXIES_CIDR", "")) != "" {
p, err := NewProxyTrust(cfg.TrustedProxiesCIDR)
if err != nil {
log.Fatalf("invalid TRUSTED_PROXIES_CIDR: %v", err)
}
pt = p
}
pb := NewPBClient(cfg)
rl := NewRateLimiter(cfg.RateLimitRPM, cfg.RateBurst)
// Initialize cache
cache := NewCache(CacheConfig{
RedisURL: cfg.RedisURL,
EnableRedis: cfg.EnableRedis,
DefaultTTL: cfg.CacheTTL,
})
// Initialize alerter
alerter := NewAlerter(AlertConfig{
Enabled: cfg.AlertEnabled,
SMTPHost: cfg.SMTPHost,
SMTPPort: cfg.SMTPPort,
SMTPUser: cfg.SMTPUser,
SMTPPassword: cfg.SMTPPassword,
SMTPFrom: cfg.SMTPFrom,
SMTPTo: cfg.SMTPTo,
UseTLS: cfg.SMTPUseTLS,
FailureThreshold: cfg.AlertFailureThreshold,
CheckInterval: cfg.AlertCheckInterval,
Cooldown: cfg.AlertCooldown,
}, pb)
alerter.Start()
mux := http.NewServeMux()
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
// Check PocketBase connectivity
ctx, cancel := context.WithTimeout(r.Context(), 2*time.Second)
defer cancel()
status := map[string]interface{}{
"status": "ok",
"time": time.Now().UTC().Format(time.RFC3339),
}
if err := pb.ensureAuth(ctx); err != nil {
status["status"] = "degraded"
status["pocketbase"] = "disconnected"
w.WriteHeader(503)
} else {
status["pocketbase"] = "connected"
w.WriteHeader(200)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(status)
})
// Dashboard HTML page - serve on root
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" {
http.NotFound(w, r)
return
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate")
_, _ = w.Write([]byte(DashboardHTML()))
})
// Redirect /dashboard to / for backwards compatibility
mux.HandleFunc("/dashboard", func(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, "/", http.StatusMovedPermanently)
})
// Prometheus-style metrics endpoint
mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
data, err := pb.FetchDashboardData(ctx, 1) // Last 24h only for metrics
if err != nil {
http.Error(w, "failed to fetch metrics", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/plain; version=0.0.4")
fmt.Fprintf(w, "# HELP telemetry_installs_total Total number of installations\n")
fmt.Fprintf(w, "# TYPE telemetry_installs_total counter\n")
fmt.Fprintf(w, "telemetry_installs_total %d\n\n", data.TotalInstalls)
fmt.Fprintf(w, "# HELP telemetry_installs_success_total Successful installations\n")
fmt.Fprintf(w, "# TYPE telemetry_installs_success_total counter\n")
fmt.Fprintf(w, "telemetry_installs_success_total %d\n\n", data.SuccessCount)
fmt.Fprintf(w, "# HELP telemetry_installs_failed_total Failed installations\n")
fmt.Fprintf(w, "# TYPE telemetry_installs_failed_total counter\n")
fmt.Fprintf(w, "telemetry_installs_failed_total %d\n\n", data.FailedCount)
fmt.Fprintf(w, "# HELP telemetry_installs_pending Current installing count\n")
fmt.Fprintf(w, "# TYPE telemetry_installs_pending gauge\n")
fmt.Fprintf(w, "telemetry_installs_pending %d\n\n", data.InstallingCount)
fmt.Fprintf(w, "# HELP telemetry_success_rate Success rate percentage\n")
fmt.Fprintf(w, "# TYPE telemetry_success_rate gauge\n")
fmt.Fprintf(w, "telemetry_success_rate %.2f\n", data.SuccessRate)
})
// Dashboard API endpoint (with caching)
mux.HandleFunc("/api/dashboard", func(w http.ResponseWriter, r *http.Request) {
days := 30
if d := r.URL.Query().Get("days"); d != "" {
fmt.Sscanf(d, "%d", &days)
if days < 1 {
days = 1
}
if days > 365 {
days = 365
}
}
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
// Try cache first
cacheKey := fmt.Sprintf("dashboard:%d", days)
var data *DashboardData
if cfg.CacheEnabled && cache.Get(ctx, cacheKey, &data) {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Cache", "HIT")
json.NewEncoder(w).Encode(data)
return
}
data, err := pb.FetchDashboardData(ctx, days)
if err != nil {
log.Printf("dashboard fetch failed: %v", err)
http.Error(w, "failed to fetch data", http.StatusInternalServerError)
return
}
// Cache the result
if cfg.CacheEnabled {
_ = cache.Set(ctx, cacheKey, data, cfg.CacheTTL)
}
w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Cache", "MISS")
json.NewEncoder(w).Encode(data)
})
// Paginated records API
mux.HandleFunc("/api/records", func(w http.ResponseWriter, r *http.Request) {
page := 1
limit := 50
status := r.URL.Query().Get("status")
app := r.URL.Query().Get("app")
osType := r.URL.Query().Get("os")
sort := r.URL.Query().Get("sort")
if p := r.URL.Query().Get("page"); p != "" {
fmt.Sscanf(p, "%d", &page)
if page < 1 {
page = 1
}
}
if l := r.URL.Query().Get("limit"); l != "" {
fmt.Sscanf(l, "%d", &limit)
if limit < 1 {
limit = 1
}
if limit > 100 {
limit = 100
}
}
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
records, total, err := pb.FetchRecordsPaginated(ctx, page, limit, status, app, osType, sort)
if err != nil {
log.Printf("records fetch failed: %v", err)
http.Error(w, "failed to fetch records", http.StatusInternalServerError)
return
}
response := map[string]interface{}{
"records": records,
"page": page,
"limit": limit,
"total": total,
"total_pages": (total + limit - 1) / limit,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
})
// Alert history and test endpoints
mux.HandleFunc("/api/alerts", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"enabled": cfg.AlertEnabled,
"history": alerter.GetAlertHistory(),
})
})
mux.HandleFunc("/api/alerts/test", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
if err := alerter.TestAlert(); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("test alert sent"))
})
mux.HandleFunc("/telemetry", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
// rate key: IP or header (header allows non-identifying keys, but header can be abused too)
var key string
switch cfg.RateKeyMode {
case "header":
key = strings.TrimSpace(r.Header.Get(cfg.RateKeyHeader))
if key == "" {
key = "missing"
}
default:
ip := getClientIP(r, pt)
if ip == nil {
key = "unknown"
} else {
// GDPR: do NOT store IP anywhere permanent; use it only in-memory for RL key
key = ip.String()
}
}
if !rl.Allow(key) {
http.Error(w, "rate limited", http.StatusTooManyRequests)
return
}
r.Body = http.MaxBytesReader(w, r.Body, cfg.MaxBodyBytes)
raw, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "invalid body", http.StatusBadRequest)
return
}
// strict JSON decode (no unknown fields)
var in TelemetryIn
dec := json.NewDecoder(bytes.NewReader(raw))
dec.DisallowUnknownFields()
if err := dec.Decode(&in); err != nil {
http.Error(w, "invalid json", http.StatusBadRequest)
return
}
if err := validate(&in); err != nil {
http.Error(w, "invalid payload", http.StatusBadRequest)
return
}
// Map input to PocketBase schema
out := TelemetryOut{
RandomID: in.RandomID,
Type: in.Type,
NSAPP: in.NSAPP,
Status: in.Status,
CTType: in.CTType,
DiskSize: in.DiskSize,
CoreCount: in.CoreCount,
RAMSize: in.RAMSize,
OsType: in.OsType,
OsVersion: in.OsVersion,
PveVer: in.PveVer,
Method: in.Method,
Error: in.Error,
ExitCode: in.ExitCode,
GPUVendor: in.GPUVendor,
GPUModel: in.GPUModel,
GPUPassthrough: in.GPUPassthrough,
CPUVendor: in.CPUVendor,
CPUModel: in.CPUModel,
RAMSpeed: in.RAMSpeed,
InstallDuration: in.InstallDuration,
ErrorCategory: in.ErrorCategory,
}
_ = computeHash(out) // For future deduplication
ctx, cancel := context.WithTimeout(r.Context(), cfg.RequestTimeout)
defer cancel()
// Upsert: Creates new record if random_id doesn't exist, updates if it does
// Routes to correct collection based on repo_source
if err := pb.UpsertTelemetry(ctx, out, in.RepoSource); err != nil {
// GDPR: don't log raw payload, don't log IPs; log only generic error
log.Printf("pocketbase write failed: %v", err)
http.Error(w, "upstream error", http.StatusBadGateway)
return
}
if cfg.EnableReqLogging {
log.Printf("telemetry accepted nsapp=%s status=%s", out.NSAPP, out.Status)
}
w.WriteHeader(http.StatusAccepted)
_, _ = w.Write([]byte("accepted"))
})
srv := &http.Server{
Addr: cfg.ListenAddr,
Handler: securityHeaders(mux),
ReadHeaderTimeout: 3 * time.Second,
}
log.Printf("telemetry-ingest listening on %s", cfg.ListenAddr)
log.Fatal(srv.ListenAndServe())
}
func securityHeaders(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Minimal security headers (no cookies anyway)
w.Header().Set("X-Content-Type-Options", "nosniff")
w.Header().Set("X-Frame-Options", "DENY")
w.Header().Set("Referrer-Policy", "no-referrer")
next.ServeHTTP(w, r)
})
}
func env(k, def string) string {
v := os.Getenv(k)
if v == "" {
return def
}
return v
}
func mustEnv(k string) string {
v := os.Getenv(k)
if v == "" {
log.Fatalf("missing env %s", k)
}
return v
}
func envInt(k string, def int) int {
v := os.Getenv(k)
if v == "" {
return def
}
var i int
_, _ = fmt.Sscanf(v, "%d", &i)
if i == 0 && v != "0" {
return def
}
return i
}
func envInt64(k string, def int64) int64 {
v := os.Getenv(k)
if v == "" {
return def
}
var i int64
_, _ = fmt.Sscanf(v, "%d", &i)
if i == 0 && v != "0" {
return def
}
return i
}
func envBool(k string, def bool) bool {
v := strings.ToLower(strings.TrimSpace(os.Getenv(k)))
if v == "" {
return def
}
return v == "1" || v == "true" || v == "yes" || v == "on"
}
func envFloat(k string, def float64) float64 {
v := os.Getenv(k)
if v == "" {
return def
}
var f float64
_, _ = fmt.Sscanf(v, "%f", &f)
if f == 0 && v != "0" {
return def
}
return f
}
func splitCSV(s string) []string {
s = strings.TrimSpace(s)
if s == "" {
return nil
}
parts := strings.Split(s, ",")
var out []string
for _, p := range parts {
p = strings.TrimSpace(p)
if p != "" {
out = append(out, p)
}
}
return out
}