Unify telemetry storage and add repo filtering
Refactor telemetry backend to store all telemetry in a single collection and add repo_source-based filtering. Key changes: - Added detect_repo_source() in misc/api.func to auto-detect/export REPO_SOURCE (ProxmoxVE/ProxmoxVED/external) when scripts are sourced. - Consolidated PocketBase collections into a single default collection (_telemetry_data) across service, migration, and scripts; updated defaults in migrate.go, migration.go, migrate.sh and migration shell scripts. - Simplified PBClient to use one targetColl and removed collection resolution logic; updated create/update/find/fetch functions to use targetColl. - Introduced repo_source field (values: "ProxmoxVE", "ProxmoxVED", "external") on telemetry records and telemetry payloads; updated validation and logging. - Added repo filtering to dashboard endpoints, FetchDashboardData and FetchRecordsPaginated, plus a repo selector in the dashboard UI; default filter is ProxmoxVE (production), with an "all" option. - Adjusted API handlers and callers to pass repo filters and include repo_source when upserting telemetry. - Misc: updated comments, error messages, and logging to reflect the new model; added telemetry-service.exe binary. Purpose: simplify data model (single collection), make telemetry attributable to repository sources, and enable dashboard filtering by repo/source.
This commit is contained in:
parent
6529949f69
commit
dc47c1c1c3
@ -37,9 +37,74 @@ TELEMETRY_URL="https://telemetry.community-scripts.org/telemetry"
|
|||||||
# Timeout for telemetry requests (seconds)
|
# Timeout for telemetry requests (seconds)
|
||||||
TELEMETRY_TIMEOUT=5
|
TELEMETRY_TIMEOUT=5
|
||||||
|
|
||||||
# Repository source identifier (auto-transformed by CI on promotion to ProxmoxVE)
|
# ==============================================================================
|
||||||
# DO NOT CHANGE - this is used by the telemetry service to route data to the correct collection
|
# SECTION 0: REPOSITORY SOURCE DETECTION
|
||||||
REPO_SOURCE="community-scripts/ProxmoxVED"
|
# ==============================================================================
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------------------
|
||||||
|
# detect_repo_source()
|
||||||
|
#
|
||||||
|
# - Dynamically detects which GitHub/Gitea repo the scripts were loaded from
|
||||||
|
# - Inspects /proc/$$/cmdline and $0 to find the source URL
|
||||||
|
# - Maps detected repo to one of three canonical values:
|
||||||
|
# * "ProxmoxVE" — official community-scripts/ProxmoxVE (production)
|
||||||
|
# * "ProxmoxVED" — official community-scripts/ProxmoxVED (development)
|
||||||
|
# * "external" — any fork or unknown source
|
||||||
|
# - Fallback: "ProxmoxVED" (CI sed transforms ProxmoxVED → ProxmoxVE on promotion)
|
||||||
|
# - Sets and exports REPO_SOURCE global variable
|
||||||
|
# - Skips detection if REPO_SOURCE is already set (e.g., by environment)
|
||||||
|
# ------------------------------------------------------------------------------
|
||||||
|
detect_repo_source() {
|
||||||
|
# Allow explicit override via environment
|
||||||
|
[[ -n "${REPO_SOURCE:-}" ]] && return 0
|
||||||
|
|
||||||
|
local content="" owner_repo=""
|
||||||
|
|
||||||
|
# Method 1: Read from /proc/$$/cmdline
|
||||||
|
# When invoked via: bash -c "$(curl -fsSL https://.../ct/app.sh)"
|
||||||
|
# the full CT/VM script content is in /proc/$$/cmdline (same PID through source chain)
|
||||||
|
if [[ -r /proc/$$/cmdline ]]; then
|
||||||
|
content=$(tr '\0' ' ' </proc/$$/cmdline 2>/dev/null) || true
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Method 2: Read from the original script file (bash ct/app.sh / bash vm/app.sh)
|
||||||
|
if [[ -z "$content" ]] || ! echo "$content" | grep -qE 'githubusercontent\.com|community-scripts\.org' 2>/dev/null; then
|
||||||
|
if [[ -f "$0" ]] && [[ "$0" != *bash* ]]; then
|
||||||
|
content=$(head -10 "$0" 2>/dev/null) || true
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Extract owner/repo from URL patterns found in the script content
|
||||||
|
if [[ -n "$content" ]]; then
|
||||||
|
# GitHub raw URL: raw.githubusercontent.com/OWNER/REPO/...
|
||||||
|
owner_repo=$(echo "$content" | grep -oE 'raw\.githubusercontent\.com/[^/]+/[^/]+' | head -1 | sed 's|raw\.githubusercontent\.com/||') || true
|
||||||
|
|
||||||
|
# Gitea URL: git.community-scripts.org/OWNER/REPO/...
|
||||||
|
if [[ -z "$owner_repo" ]]; then
|
||||||
|
owner_repo=$(echo "$content" | grep -oE 'git\.community-scripts\.org/[^/]+/[^/]+' | head -1 | sed 's|git\.community-scripts\.org/||') || true
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Map detected owner/repo to canonical repo_source value
|
||||||
|
case "$owner_repo" in
|
||||||
|
community-scripts/ProxmoxVE) REPO_SOURCE="ProxmoxVE" ;;
|
||||||
|
community-scripts/ProxmoxVED) REPO_SOURCE="ProxmoxVED" ;;
|
||||||
|
"")
|
||||||
|
# No URL detected — use hardcoded fallback
|
||||||
|
# CI sed transforms this on promotion: ProxmoxVED → ProxmoxVE
|
||||||
|
REPO_SOURCE="ProxmoxVED"
|
||||||
|
;;
|
||||||
|
*)
|
||||||
|
# Fork or unknown repo
|
||||||
|
REPO_SOURCE="external"
|
||||||
|
;;
|
||||||
|
esac
|
||||||
|
|
||||||
|
export REPO_SOURCE
|
||||||
|
}
|
||||||
|
|
||||||
|
# Run detection immediately when api.func is sourced
|
||||||
|
detect_repo_source
|
||||||
|
|
||||||
# ==============================================================================
|
# ==============================================================================
|
||||||
# SECTION 1: ERROR CODE DESCRIPTIONS
|
# SECTION 1: ERROR CODE DESCRIPTIONS
|
||||||
|
|||||||
@ -134,7 +134,7 @@ func (a *Alerter) checkAndAlert() {
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
// Fetch last hour's data
|
// Fetch last hour's data
|
||||||
data, err := a.pb.FetchDashboardData(ctx, 1)
|
data, err := a.pb.FetchDashboardData(ctx, 1, "ProxmoxVE")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("WARN: alert check failed: %v", err)
|
log.Printf("WARN: alert check failed: %v", err)
|
||||||
return
|
return
|
||||||
@ -410,13 +410,13 @@ func (a *Alerter) fetchWeeklyReportData(ctx context.Context) (*WeeklyReportData,
|
|||||||
year, week := lastMonday.ISOWeek()
|
year, week := lastMonday.ISOWeek()
|
||||||
|
|
||||||
// Fetch current week's data (7 days)
|
// Fetch current week's data (7 days)
|
||||||
currentData, err := a.pb.FetchDashboardData(ctx, 7)
|
currentData, err := a.pb.FetchDashboardData(ctx, 7, "ProxmoxVE")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to fetch current week data: %w", err)
|
return nil, fmt.Errorf("failed to fetch current week data: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch previous week's data for comparison (14 days, we'll compare)
|
// Fetch previous week's data for comparison (14 days, we'll compare)
|
||||||
prevData, err := a.pb.FetchDashboardData(ctx, 14)
|
prevData, err := a.pb.FetchDashboardData(ctx, 14, "ProxmoxVE")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Non-fatal, just log
|
// Non-fatal, just log
|
||||||
log.Printf("WARN: could not fetch previous week data: %v", err)
|
log.Printf("WARN: could not fetch previous week data: %v", err)
|
||||||
|
|||||||
@ -108,7 +108,7 @@ func (c *Cleaner) findStuckInstallations(ctx context.Context) ([]StuckRecord, er
|
|||||||
|
|
||||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet,
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet,
|
||||||
fmt.Sprintf("%s/api/collections/%s/records?filter=%s&perPage=100",
|
fmt.Sprintf("%s/api/collections/%s/records?filter=%s&perPage=100",
|
||||||
c.pb.baseURL, c.pb.devColl, filter),
|
c.pb.baseURL, c.pb.targetColl, filter),
|
||||||
nil,
|
nil,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@ -104,20 +104,31 @@ type AddonCount struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// FetchDashboardData retrieves aggregated data from PocketBase
|
// FetchDashboardData retrieves aggregated data from PocketBase
|
||||||
func (p *PBClient) FetchDashboardData(ctx context.Context, days int) (*DashboardData, error) {
|
// repoSource filters by repo_source field ("ProxmoxVE", "ProxmoxVED", "external", or "" for all)
|
||||||
|
func (p *PBClient) FetchDashboardData(ctx context.Context, days int, repoSource string) (*DashboardData, error) {
|
||||||
if err := p.ensureAuth(ctx); err != nil {
|
if err := p.ensureAuth(ctx); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
data := &DashboardData{}
|
data := &DashboardData{}
|
||||||
|
|
||||||
// Calculate date filter (days=0 means all entries)
|
// Build filter parts
|
||||||
var filter string
|
var filterParts []string
|
||||||
|
|
||||||
|
// Date filter (days=0 means all entries)
|
||||||
if days > 0 {
|
if days > 0 {
|
||||||
since := time.Now().AddDate(0, 0, -days).Format("2006-01-02 00:00:00")
|
since := time.Now().AddDate(0, 0, -days).Format("2006-01-02 00:00:00")
|
||||||
filter = url.QueryEscape(fmt.Sprintf("created >= '%s'", since))
|
filterParts = append(filterParts, fmt.Sprintf("created >= '%s'", since))
|
||||||
} else {
|
}
|
||||||
filter = "" // No filter = all entries
|
|
||||||
|
// Repo source filter
|
||||||
|
if repoSource != "" {
|
||||||
|
filterParts = append(filterParts, fmt.Sprintf("repo_source = '%s'", repoSource))
|
||||||
|
}
|
||||||
|
|
||||||
|
var filter string
|
||||||
|
if len(filterParts) > 0 {
|
||||||
|
filter = url.QueryEscape(strings.Join(filterParts, " && "))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch all records for the period
|
// Fetch all records for the period
|
||||||
@ -306,10 +317,10 @@ func (p *PBClient) fetchRecords(ctx context.Context, filter string) ([]Telemetry
|
|||||||
var url string
|
var url string
|
||||||
if filter != "" {
|
if filter != "" {
|
||||||
url = fmt.Sprintf("%s/api/collections/%s/records?filter=%s&sort=-created&page=%d&perPage=%d",
|
url = fmt.Sprintf("%s/api/collections/%s/records?filter=%s&sort=-created&page=%d&perPage=%d",
|
||||||
p.baseURL, p.devColl, filter, page, perPage)
|
p.baseURL, p.targetColl, filter, page, perPage)
|
||||||
} else {
|
} else {
|
||||||
url = fmt.Sprintf("%s/api/collections/%s/records?sort=-created&page=%d&perPage=%d",
|
url = fmt.Sprintf("%s/api/collections/%s/records?sort=-created&page=%d&perPage=%d",
|
||||||
p.baseURL, p.devColl, page, perPage)
|
p.baseURL, p.targetColl, page, perPage)
|
||||||
}
|
}
|
||||||
|
|
||||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||||
@ -1413,6 +1424,12 @@ func DashboardHTML() string {
|
|||||||
Telemetry Dashboard
|
Telemetry Dashboard
|
||||||
</h1>
|
</h1>
|
||||||
<div class="controls">
|
<div class="controls">
|
||||||
|
<select id="repoFilter" onchange="refreshData()" title="Filter by repository source">
|
||||||
|
<option value="ProxmoxVE" selected>ProxmoxVE (Production)</option>
|
||||||
|
<option value="ProxmoxVED">ProxmoxVED (Development)</option>
|
||||||
|
<option value="external">External (Forks)</option>
|
||||||
|
<option value="all">All Sources</option>
|
||||||
|
</select>
|
||||||
<div class="quickfilter">
|
<div class="quickfilter">
|
||||||
<button class="filter-btn" data-days="7">7 Days</button>
|
<button class="filter-btn" data-days="7">7 Days</button>
|
||||||
<button class="filter-btn active" data-days="30">30 Days</button>
|
<button class="filter-btn active" data-days="30">30 Days</button>
|
||||||
@ -1676,8 +1693,9 @@ func DashboardHTML() string {
|
|||||||
async function fetchData() {
|
async function fetchData() {
|
||||||
const activeBtn = document.querySelector('.filter-btn.active');
|
const activeBtn = document.querySelector('.filter-btn.active');
|
||||||
const days = activeBtn ? activeBtn.dataset.days : '30';
|
const days = activeBtn ? activeBtn.dataset.days : '30';
|
||||||
|
const repo = document.getElementById('repoFilter').value;
|
||||||
try {
|
try {
|
||||||
const response = await fetch('/api/dashboard?days=' + days);
|
const response = await fetch('/api/dashboard?days=' + days + '&repo=' + repo);
|
||||||
if (!response.ok) throw new Error('Failed to fetch data');
|
if (!response.ok) throw new Error('Failed to fetch data');
|
||||||
return await response.json();
|
return await response.json();
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
|
|||||||
@ -93,13 +93,13 @@ func main() {
|
|||||||
pbCollection = os.Getenv("PB_TARGET_COLLECTION")
|
pbCollection = os.Getenv("PB_TARGET_COLLECTION")
|
||||||
}
|
}
|
||||||
if pbCollection == "" {
|
if pbCollection == "" {
|
||||||
pbCollection = "_dev_telemetry_data"
|
pbCollection = "_telemetry_data"
|
||||||
}
|
}
|
||||||
|
|
||||||
// Auth collection
|
// Auth collection
|
||||||
authCollection := os.Getenv("PB_AUTH_COLLECTION")
|
authCollection := os.Getenv("PB_AUTH_COLLECTION")
|
||||||
if authCollection == "" {
|
if authCollection == "" {
|
||||||
authCollection = "_dev_telemetry_service"
|
authCollection = "_telemetry_service"
|
||||||
}
|
}
|
||||||
|
|
||||||
// Credentials
|
// Credentials
|
||||||
|
|||||||
@ -13,7 +13,7 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
|||||||
|
|
||||||
# Default values
|
# Default values
|
||||||
POCKETBASE_URL="${1:-http://localhost:8090}"
|
POCKETBASE_URL="${1:-http://localhost:8090}"
|
||||||
POCKETBASE_COLLECTION="${2:-_dev_telemetry_data}"
|
POCKETBASE_COLLECTION="${2:-_telemetry_data}"
|
||||||
|
|
||||||
echo "============================================="
|
echo "============================================="
|
||||||
echo " ProxmoxVED Data Migration Tool"
|
echo " ProxmoxVED Data Migration Tool"
|
||||||
|
|||||||
@ -13,7 +13,7 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
|||||||
|
|
||||||
# Default values
|
# Default values
|
||||||
POCKETBASE_URL="${1:-http://localhost:8090}"
|
POCKETBASE_URL="${1:-http://localhost:8090}"
|
||||||
POCKETBASE_COLLECTION="${2:-_dev_telemetry_data}"
|
POCKETBASE_COLLECTION="${2:-_telemetry_data}"
|
||||||
|
|
||||||
echo "============================================="
|
echo "============================================="
|
||||||
echo " ProxmoxVED Data Migration Tool"
|
echo " ProxmoxVED Data Migration Tool"
|
||||||
|
|||||||
@ -95,13 +95,13 @@ func main() {
|
|||||||
pbCollection = os.Getenv("PB_TARGET_COLLECTION")
|
pbCollection = os.Getenv("PB_TARGET_COLLECTION")
|
||||||
}
|
}
|
||||||
if pbCollection == "" {
|
if pbCollection == "" {
|
||||||
pbCollection = "_dev_telemetry_data"
|
pbCollection = "_telemetry_data"
|
||||||
}
|
}
|
||||||
|
|
||||||
// Auth collection
|
// Auth collection
|
||||||
authCollection := os.Getenv("PB_AUTH_COLLECTION")
|
authCollection := os.Getenv("PB_AUTH_COLLECTION")
|
||||||
if authCollection == "" {
|
if authCollection == "" {
|
||||||
authCollection = "_dev_telemetry_service"
|
authCollection = "_telemetry_service"
|
||||||
}
|
}
|
||||||
|
|
||||||
// Credentials - prefer admin auth for timestamp preservation
|
// Credentials - prefer admin auth for timestamp preservation
|
||||||
|
|||||||
@ -24,11 +24,10 @@ type Config struct {
|
|||||||
|
|
||||||
// PocketBase
|
// PocketBase
|
||||||
PBBaseURL string
|
PBBaseURL string
|
||||||
PBAuthCollection string // "_dev_telemetry_service"
|
PBAuthCollection string // "_telemetry_service"
|
||||||
PBIdentity string // email
|
PBIdentity string // email
|
||||||
PBPassword string
|
PBPassword string
|
||||||
PBTargetColl string // "_dev_telemetry_data" (dev default)
|
PBTargetColl string // "_telemetry_data"
|
||||||
PBLiveTargetColl string // "_live_telemetry_data" (production)
|
|
||||||
|
|
||||||
// Limits
|
// Limits
|
||||||
MaxBodyBytes int64
|
MaxBodyBytes int64
|
||||||
@ -104,10 +103,10 @@ type TelemetryIn struct {
|
|||||||
ErrorCategory string `json:"error_category,omitempty"` // "network", "storage", "dependency", "permission", "timeout", "unknown"
|
ErrorCategory string `json:"error_category,omitempty"` // "network", "storage", "dependency", "permission", "timeout", "unknown"
|
||||||
|
|
||||||
// Repository source for collection routing
|
// Repository source for collection routing
|
||||||
RepoSource string `json:"repo_source,omitempty"` // "community-scripts/ProxmoxVE" or "community-scripts/ProxmoxVED"
|
RepoSource string `json:"repo_source,omitempty"` // "ProxmoxVE", "ProxmoxVED", or "external"
|
||||||
}
|
}
|
||||||
|
|
||||||
// TelemetryOut is sent to PocketBase (matches _dev_telemetry_data collection)
|
// TelemetryOut is sent to PocketBase (matches _telemetry_data collection)
|
||||||
type TelemetryOut struct {
|
type TelemetryOut struct {
|
||||||
RandomID string `json:"random_id"`
|
RandomID string `json:"random_id"`
|
||||||
Type string `json:"type"`
|
Type string `json:"type"`
|
||||||
@ -133,6 +132,9 @@ type TelemetryOut struct {
|
|||||||
RAMSpeed string `json:"ram_speed,omitempty"`
|
RAMSpeed string `json:"ram_speed,omitempty"`
|
||||||
InstallDuration int `json:"install_duration,omitempty"`
|
InstallDuration int `json:"install_duration,omitempty"`
|
||||||
ErrorCategory string `json:"error_category,omitempty"`
|
ErrorCategory string `json:"error_category,omitempty"`
|
||||||
|
|
||||||
|
// Repository source: "ProxmoxVE", "ProxmoxVED", or "external"
|
||||||
|
RepoSource string `json:"repo_source,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// TelemetryStatusUpdate contains only fields needed for status updates
|
// TelemetryStatusUpdate contains only fields needed for status updates
|
||||||
@ -150,10 +152,11 @@ type TelemetryStatusUpdate struct {
|
|||||||
RAMSpeed string `json:"ram_speed,omitempty"`
|
RAMSpeed string `json:"ram_speed,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Allowed values for 'repo_source' field — controls collection routing
|
// Allowed values for 'repo_source' field
|
||||||
var allowedRepoSource = map[string]bool{
|
var allowedRepoSource = map[string]bool{
|
||||||
"community-scripts/ProxmoxVE": true,
|
"ProxmoxVE": true,
|
||||||
"community-scripts/ProxmoxVED": true,
|
"ProxmoxVED": true,
|
||||||
|
"external": true,
|
||||||
}
|
}
|
||||||
|
|
||||||
type PBClient struct {
|
type PBClient struct {
|
||||||
@ -161,8 +164,7 @@ type PBClient struct {
|
|||||||
authCollection string
|
authCollection string
|
||||||
identity string
|
identity string
|
||||||
password string
|
password string
|
||||||
devColl string // "_dev_telemetry_data"
|
targetColl string // single collection for all telemetry data
|
||||||
liveColl string // "_live_telemetry_data"
|
|
||||||
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
token string
|
token string
|
||||||
@ -176,25 +178,13 @@ func NewPBClient(cfg Config) *PBClient {
|
|||||||
authCollection: cfg.PBAuthCollection,
|
authCollection: cfg.PBAuthCollection,
|
||||||
identity: cfg.PBIdentity,
|
identity: cfg.PBIdentity,
|
||||||
password: cfg.PBPassword,
|
password: cfg.PBPassword,
|
||||||
devColl: cfg.PBTargetColl,
|
targetColl: cfg.PBTargetColl,
|
||||||
liveColl: cfg.PBLiveTargetColl,
|
|
||||||
http: &http.Client{
|
http: &http.Client{
|
||||||
Timeout: cfg.RequestTimeout,
|
Timeout: cfg.RequestTimeout,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// resolveCollection maps a repo_source value to the correct PocketBase collection.
|
|
||||||
// - "community-scripts/ProxmoxVE" → live collection
|
|
||||||
// - "community-scripts/ProxmoxVED" → dev collection
|
|
||||||
// - empty / unknown → dev collection (safe default)
|
|
||||||
func (p *PBClient) resolveCollection(repoSource string) string {
|
|
||||||
if repoSource == "community-scripts/ProxmoxVE" && p.liveColl != "" {
|
|
||||||
return p.liveColl
|
|
||||||
}
|
|
||||||
return p.devColl
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *PBClient) ensureAuth(ctx context.Context) error {
|
func (p *PBClient) ensureAuth(ctx context.Context) error {
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
defer p.mu.Unlock()
|
defer p.mu.Unlock()
|
||||||
@ -246,8 +236,8 @@ func (p *PBClient) ensureAuth(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// FindRecordByRandomID searches for an existing record by random_id in the given collection
|
// FindRecordByRandomID searches for an existing record by random_id
|
||||||
func (p *PBClient) FindRecordByRandomID(ctx context.Context, coll, randomID string) (string, error) {
|
func (p *PBClient) FindRecordByRandomID(ctx context.Context, randomID string) (string, error) {
|
||||||
if err := p.ensureAuth(ctx); err != nil {
|
if err := p.ensureAuth(ctx); err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@ -256,7 +246,7 @@ func (p *PBClient) FindRecordByRandomID(ctx context.Context, coll, randomID stri
|
|||||||
filter := fmt.Sprintf("random_id='%s'", randomID)
|
filter := fmt.Sprintf("random_id='%s'", randomID)
|
||||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet,
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet,
|
||||||
fmt.Sprintf("%s/api/collections/%s/records?filter=%s&fields=id&perPage=1",
|
fmt.Sprintf("%s/api/collections/%s/records?filter=%s&fields=id&perPage=1",
|
||||||
p.baseURL, coll, filter),
|
p.baseURL, p.targetColl, filter),
|
||||||
nil,
|
nil,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -290,14 +280,14 @@ func (p *PBClient) FindRecordByRandomID(ctx context.Context, coll, randomID stri
|
|||||||
}
|
}
|
||||||
|
|
||||||
// UpdateTelemetryStatus updates only status, error, and exit_code of an existing record
|
// UpdateTelemetryStatus updates only status, error, and exit_code of an existing record
|
||||||
func (p *PBClient) UpdateTelemetryStatus(ctx context.Context, coll, recordID string, update TelemetryStatusUpdate) error {
|
func (p *PBClient) UpdateTelemetryStatus(ctx context.Context, recordID string, update TelemetryStatusUpdate) error {
|
||||||
if err := p.ensureAuth(ctx); err != nil {
|
if err := p.ensureAuth(ctx); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
b, _ := json.Marshal(update)
|
b, _ := json.Marshal(update)
|
||||||
req, err := http.NewRequestWithContext(ctx, http.MethodPatch,
|
req, err := http.NewRequestWithContext(ctx, http.MethodPatch,
|
||||||
fmt.Sprintf("%s/api/collections/%s/records/%s", p.baseURL, coll, recordID),
|
fmt.Sprintf("%s/api/collections/%s/records/%s", p.baseURL, p.targetColl, recordID),
|
||||||
bytes.NewReader(b),
|
bytes.NewReader(b),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -319,8 +309,7 @@ func (p *PBClient) UpdateTelemetryStatus(ctx context.Context, coll, recordID str
|
|||||||
}
|
}
|
||||||
|
|
||||||
// FetchRecordsPaginated retrieves records with pagination and optional filters.
|
// FetchRecordsPaginated retrieves records with pagination and optional filters.
|
||||||
// Uses devColl by default (dashboard shows dev data); for live data, use separate endpoint if needed.
|
func (p *PBClient) FetchRecordsPaginated(ctx context.Context, page, limit int, status, app, osType, sortField, repoSource string) ([]TelemetryRecord, int, error) {
|
||||||
func (p *PBClient) FetchRecordsPaginated(ctx context.Context, page, limit int, status, app, osType, sortField string) ([]TelemetryRecord, int, error) {
|
|
||||||
if err := p.ensureAuth(ctx); err != nil {
|
if err := p.ensureAuth(ctx); err != nil {
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
@ -336,6 +325,9 @@ func (p *PBClient) FetchRecordsPaginated(ctx context.Context, page, limit int, s
|
|||||||
if osType != "" {
|
if osType != "" {
|
||||||
filters = append(filters, fmt.Sprintf("os_type='%s'", osType))
|
filters = append(filters, fmt.Sprintf("os_type='%s'", osType))
|
||||||
}
|
}
|
||||||
|
if repoSource != "" {
|
||||||
|
filters = append(filters, fmt.Sprintf("repo_source='%s'", repoSource))
|
||||||
|
}
|
||||||
|
|
||||||
filterStr := ""
|
filterStr := ""
|
||||||
if len(filters) > 0 {
|
if len(filters) > 0 {
|
||||||
@ -361,7 +353,7 @@ func (p *PBClient) FetchRecordsPaginated(ctx context.Context, page, limit int, s
|
|||||||
}
|
}
|
||||||
|
|
||||||
reqURL := fmt.Sprintf("%s/api/collections/%s/records?sort=%s&page=%d&perPage=%d%s",
|
reqURL := fmt.Sprintf("%s/api/collections/%s/records?sort=%s&page=%d&perPage=%d%s",
|
||||||
p.baseURL, p.devColl, sort, page, limit, filterStr)
|
p.baseURL, p.targetColl, sort, page, limit, filterStr)
|
||||||
|
|
||||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil)
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -391,22 +383,18 @@ func (p *PBClient) FetchRecordsPaginated(ctx context.Context, page, limit int, s
|
|||||||
}
|
}
|
||||||
|
|
||||||
// UpsertTelemetry handles both creation and updates intelligently.
|
// UpsertTelemetry handles both creation and updates intelligently.
|
||||||
// Routes to the correct PocketBase collection based on repoSource:
|
// All records go to the same collection; repo_source is stored as a field.
|
||||||
// - "community-scripts/ProxmoxVE" → _live_telemetry_data
|
|
||||||
// - "community-scripts/ProxmoxVED" → _dev_telemetry_data
|
|
||||||
//
|
//
|
||||||
// For status="installing": always creates a new record.
|
// For status="installing": always creates a new record.
|
||||||
// For status!="installing": updates existing record (found by random_id).
|
// For status!="installing": updates existing record (found by random_id).
|
||||||
func (p *PBClient) UpsertTelemetry(ctx context.Context, payload TelemetryOut, repoSource string) error {
|
func (p *PBClient) UpsertTelemetry(ctx context.Context, payload TelemetryOut) error {
|
||||||
coll := p.resolveCollection(repoSource)
|
|
||||||
|
|
||||||
// For "installing" status, always create new record
|
// For "installing" status, always create new record
|
||||||
if payload.Status == "installing" {
|
if payload.Status == "installing" {
|
||||||
return p.CreateTelemetry(ctx, coll, payload)
|
return p.CreateTelemetry(ctx, payload)
|
||||||
}
|
}
|
||||||
|
|
||||||
// For status updates (success/failed/unknown), find and update existing record
|
// For status updates (success/failed/unknown), find and update existing record
|
||||||
recordID, err := p.FindRecordByRandomID(ctx, coll, payload.RandomID)
|
recordID, err := p.FindRecordByRandomID(ctx, payload.RandomID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Search failed, log and return error
|
// Search failed, log and return error
|
||||||
return fmt.Errorf("cannot find record to update: %w", err)
|
return fmt.Errorf("cannot find record to update: %w", err)
|
||||||
@ -415,7 +403,7 @@ func (p *PBClient) UpsertTelemetry(ctx context.Context, payload TelemetryOut, re
|
|||||||
if recordID == "" {
|
if recordID == "" {
|
||||||
// Record not found - this shouldn't happen normally
|
// Record not found - this shouldn't happen normally
|
||||||
// Create a full record as fallback
|
// Create a full record as fallback
|
||||||
return p.CreateTelemetry(ctx, coll, payload)
|
return p.CreateTelemetry(ctx, payload)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update only status, error, exit_code, and new metrics fields
|
// Update only status, error, exit_code, and new metrics fields
|
||||||
@ -432,17 +420,17 @@ func (p *PBClient) UpsertTelemetry(ctx context.Context, payload TelemetryOut, re
|
|||||||
CPUModel: payload.CPUModel,
|
CPUModel: payload.CPUModel,
|
||||||
RAMSpeed: payload.RAMSpeed,
|
RAMSpeed: payload.RAMSpeed,
|
||||||
}
|
}
|
||||||
return p.UpdateTelemetryStatus(ctx, coll, recordID, update)
|
return p.UpdateTelemetryStatus(ctx, recordID, update)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PBClient) CreateTelemetry(ctx context.Context, coll string, payload TelemetryOut) error {
|
func (p *PBClient) CreateTelemetry(ctx context.Context, payload TelemetryOut) error {
|
||||||
if err := p.ensureAuth(ctx); err != nil {
|
if err := p.ensureAuth(ctx); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
b, _ := json.Marshal(payload)
|
b, _ := json.Marshal(payload)
|
||||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost,
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost,
|
||||||
fmt.Sprintf("%s/api/collections/%s/records", p.baseURL, coll),
|
fmt.Sprintf("%s/api/collections/%s/records", p.baseURL, p.targetColl),
|
||||||
bytes.NewReader(b),
|
bytes.NewReader(b),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -730,9 +718,9 @@ func validate(in *TelemetryIn) error {
|
|||||||
return errors.New("invalid install_duration (max 24h)")
|
return errors.New("invalid install_duration (max 24h)")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate repo_source: must be an allowed repository or empty
|
// Validate repo_source: must be a known value or empty
|
||||||
if in.RepoSource != "" && !allowedRepoSource[in.RepoSource] {
|
if in.RepoSource != "" && !allowedRepoSource[in.RepoSource] {
|
||||||
return errors.New("invalid repo_source (must be 'community-scripts/ProxmoxVE' or 'community-scripts/ProxmoxVED')")
|
return fmt.Errorf("rejected repo_source '%s' (must be 'ProxmoxVE', 'ProxmoxVED', or 'external')", in.RepoSource)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -755,11 +743,10 @@ func main() {
|
|||||||
TrustedProxiesCIDR: splitCSV(env("TRUSTED_PROXIES_CIDR", "")),
|
TrustedProxiesCIDR: splitCSV(env("TRUSTED_PROXIES_CIDR", "")),
|
||||||
|
|
||||||
PBBaseURL: mustEnv("PB_URL"),
|
PBBaseURL: mustEnv("PB_URL"),
|
||||||
PBAuthCollection: env("PB_AUTH_COLLECTION", "_dev_telemetry_service"),
|
PBAuthCollection: env("PB_AUTH_COLLECTION", "_telemetry_service"),
|
||||||
PBIdentity: mustEnv("PB_IDENTITY"),
|
PBIdentity: mustEnv("PB_IDENTITY"),
|
||||||
PBPassword: mustEnv("PB_PASSWORD"),
|
PBPassword: mustEnv("PB_PASSWORD"),
|
||||||
PBTargetColl: env("PB_TARGET_COLLECTION", "_dev_telemetry_data"),
|
PBTargetColl: env("PB_TARGET_COLLECTION", "_telemetry_data"),
|
||||||
PBLiveTargetColl: env("PB_LIVE_TARGET_COLLECTION", "_live_telemetry_data"),
|
|
||||||
|
|
||||||
MaxBodyBytes: envInt64("MAX_BODY_BYTES", 1024),
|
MaxBodyBytes: envInt64("MAX_BODY_BYTES", 1024),
|
||||||
RateLimitRPM: envInt("RATE_LIMIT_RPM", 60),
|
RateLimitRPM: envInt("RATE_LIMIT_RPM", 60),
|
||||||
@ -870,7 +857,7 @@ func main() {
|
|||||||
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
|
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
data, err := pb.FetchDashboardData(ctx, 1) // Last 24h only for metrics
|
data, err := pb.FetchDashboardData(ctx, 1, "ProxmoxVE") // Last 24h, production only for metrics
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(w, "failed to fetch metrics", http.StatusInternalServerError)
|
http.Error(w, "failed to fetch metrics", http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
@ -907,11 +894,21 @@ func main() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// repo_source filter (default: ProxmoxVE)
|
||||||
|
repoSource := r.URL.Query().Get("repo")
|
||||||
|
if repoSource == "" {
|
||||||
|
repoSource = "ProxmoxVE"
|
||||||
|
}
|
||||||
|
// "all" means no filter
|
||||||
|
if repoSource == "all" {
|
||||||
|
repoSource = ""
|
||||||
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
|
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
// Try cache first
|
// Try cache first
|
||||||
cacheKey := fmt.Sprintf("dashboard:%d", days)
|
cacheKey := fmt.Sprintf("dashboard:%d:%s", days, repoSource)
|
||||||
var data *DashboardData
|
var data *DashboardData
|
||||||
if cfg.CacheEnabled && cache.Get(ctx, cacheKey, &data) {
|
if cfg.CacheEnabled && cache.Get(ctx, cacheKey, &data) {
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
@ -920,7 +917,7 @@ func main() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
data, err := pb.FetchDashboardData(ctx, days)
|
data, err := pb.FetchDashboardData(ctx, days, repoSource)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("dashboard fetch failed: %v", err)
|
log.Printf("dashboard fetch failed: %v", err)
|
||||||
http.Error(w, "failed to fetch data", http.StatusInternalServerError)
|
http.Error(w, "failed to fetch data", http.StatusInternalServerError)
|
||||||
@ -945,6 +942,10 @@ func main() {
|
|||||||
app := r.URL.Query().Get("app")
|
app := r.URL.Query().Get("app")
|
||||||
osType := r.URL.Query().Get("os")
|
osType := r.URL.Query().Get("os")
|
||||||
sort := r.URL.Query().Get("sort")
|
sort := r.URL.Query().Get("sort")
|
||||||
|
repoSource := r.URL.Query().Get("repo")
|
||||||
|
if repoSource == "" {
|
||||||
|
repoSource = "ProxmoxVE" // Default filter: production data
|
||||||
|
}
|
||||||
|
|
||||||
if p := r.URL.Query().Get("page"); p != "" {
|
if p := r.URL.Query().Get("page"); p != "" {
|
||||||
fmt.Sscanf(p, "%d", &page)
|
fmt.Sscanf(p, "%d", &page)
|
||||||
@ -965,7 +966,7 @@ func main() {
|
|||||||
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
|
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
records, total, err := pb.FetchRecordsPaginated(ctx, page, limit, status, app, osType, sort)
|
records, total, err := pb.FetchRecordsPaginated(ctx, page, limit, status, app, osType, sort, repoSource)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("records fetch failed: %v", err)
|
log.Printf("records fetch failed: %v", err)
|
||||||
http.Error(w, "failed to fetch records", http.StatusInternalServerError)
|
http.Error(w, "failed to fetch records", http.StatusInternalServerError)
|
||||||
@ -1052,6 +1053,9 @@ func main() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := validate(&in); err != nil {
|
if err := validate(&in); err != nil {
|
||||||
|
if cfg.EnableReqLogging {
|
||||||
|
log.Printf("telemetry rejected: %v", err)
|
||||||
|
}
|
||||||
http.Error(w, "invalid payload", http.StatusBadRequest)
|
http.Error(w, "invalid payload", http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -1080,6 +1084,7 @@ func main() {
|
|||||||
RAMSpeed: in.RAMSpeed,
|
RAMSpeed: in.RAMSpeed,
|
||||||
InstallDuration: in.InstallDuration,
|
InstallDuration: in.InstallDuration,
|
||||||
ErrorCategory: in.ErrorCategory,
|
ErrorCategory: in.ErrorCategory,
|
||||||
|
RepoSource: in.RepoSource,
|
||||||
}
|
}
|
||||||
_ = computeHash(out) // For future deduplication
|
_ = computeHash(out) // For future deduplication
|
||||||
|
|
||||||
@ -1087,8 +1092,8 @@ func main() {
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
// Upsert: Creates new record if random_id doesn't exist, updates if it does
|
// Upsert: Creates new record if random_id doesn't exist, updates if it does
|
||||||
// Routes to correct collection based on repo_source
|
// repo_source is stored as a field on the record for filtering
|
||||||
if err := pb.UpsertTelemetry(ctx, out, in.RepoSource); err != nil {
|
if err := pb.UpsertTelemetry(ctx, out); err != nil {
|
||||||
// GDPR: don't log raw payload, don't log IPs; log only generic error
|
// GDPR: don't log raw payload, don't log IPs; log only generic error
|
||||||
log.Printf("pocketbase write failed: %v", err)
|
log.Printf("pocketbase write failed: %v", err)
|
||||||
http.Error(w, "upstream error", http.StatusBadGateway)
|
http.Error(w, "upstream error", http.StatusBadGateway)
|
||||||
@ -1096,7 +1101,7 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if cfg.EnableReqLogging {
|
if cfg.EnableReqLogging {
|
||||||
log.Printf("telemetry accepted nsapp=%s status=%s", out.NSAPP, out.Status)
|
log.Printf("telemetry accepted nsapp=%s status=%s repo=%s", out.NSAPP, out.Status, in.RepoSource)
|
||||||
}
|
}
|
||||||
|
|
||||||
w.WriteHeader(http.StatusAccepted)
|
w.WriteHeader(http.StatusAccepted)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user