CanbiZ (MickLesk) 0231b72d78
Some checks failed
Update GitHub Versions (New) / update-github-versions (push) Has been cancelled
Update Versions from GitHub / update-versions (push) Has been cancelled
Add telemetry data service and dashboard revamp
Introduce a telemetry data microservice under misc/data: add Dockerfile, entrypoint, migration tools, README, LICENSE and a .gitignore. Increase Docker CACHE_TTL_SECONDS to 300s. Implement extensive dashboard and analytics updates in dashboard.go: add total_all_time and sample_size, return total item counts from fetchRecords (with page/limit handling and a maxRecords guard), raise top-N limits, add a minimum-installs threshold for failed-apps, and numerous UI/style/layout improvements in the embedded DashboardHTML. Minor formatting tweak to misc/api.func.
2026-02-12 13:10:06 +01:00

1296 lines
36 KiB
Go

// +build ignore
// Migration script to import data from the old API or JSON file to PocketBase
// Run with: go run migrate.go
//
// Environment variables:
// JSON_FILE - Path to JSON file to import (if set, skips API fetch)
// SQL_OUTPUT - Path to output SQL file (direct SQLite import, fastest!)
// MIGRATION_SOURCE_URL - Base URL of source API (default: https://api.htl-braunau.at/data)
// POCKETBASE_URL - PocketBase URL (default: http://localhost:8090)
// POCKETBASE_COLLECTION - Target collection (default: telemetry)
// PB_IDENTITY - Auth username
// PB_PASSWORD - Auth password
// REPO_SOURCE - Value for repo_source field (e.g., "community-scripts" or "Proxmox VE")
// DATE_UNTIL - Only import records created before this date (format: YYYY-MM-DD)
// DATE_FROM - Only import records created after this date (format: YYYY-MM-DD)
// START_PAGE - Resume from this page (default: 1, API mode only)
// BATCH_SIZE - Records per batch (default: 500)
// SKIP_RECORDS - Skip first N records (JSON file mode, for resuming)
// WORKERS - Number of parallel HTTP workers (default: 50)
//
// SQL Output Mode (fastest - seconds for millions of records):
// $env:JSON_FILE = "data.json"
// $env:SQL_OUTPUT = "import.sql"
// $env:REPO_SOURCE = "Proxmox VE"
// .\migrate.exe
// # Then on server: sqlite3 /app/pb_data/data.db < import.sql
//
// JSON file format (array of objects):
// [{"id": "...", "ct_type": 1, "nsapp": "...", ...}, ...]
package main
import (
"bufio"
"bytes"
cryptoRand "crypto/rand"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)
const (
defaultSourceAPI = "https://api.htl-braunau.at/data"
defaultPBURL = "http://localhost:8090"
defaultBatchSize = 500
defaultWorkers = 50 // Parallel workers for importing
)
var (
sourceAPI string
summaryAPI string
authToken string
repoSource string
dateUntil time.Time
dateFrom time.Time
hasDateUntil bool
hasDateFrom bool
workerCount int = defaultWorkers
)
// OldDataModel represents the data structure from the old API
type OldDataModel struct {
ID string `json:"id"`
CtType int `json:"ct_type"`
DiskSize int `json:"disk_size"`
CoreCount int `json:"core_count"`
RamSize int `json:"ram_size"`
OsType string `json:"os_type"`
OsVersion string `json:"os_version"`
DisableIP6 string `json:"disableip6"`
NsApp string `json:"nsapp"`
Method string `json:"method"`
CreatedAt string `json:"created_at"`
PveVersion string `json:"pve_version"`
Status string `json:"status"`
RandomID string `json:"random_id"`
Type string `json:"type"`
Error string `json:"error"`
}
// MongoNumberLong represents MongoDB's $numberLong type
type MongoNumberLong struct {
Value string `json:"$numberLong"`
}
// MongoDate represents MongoDB's $date type
type MongoDate struct {
Value string `json:"$date"`
}
// MongoOID represents MongoDB's $oid type
type MongoOID struct {
Value string `json:"$oid"`
}
// MongoDataModel represents MongoDB Extended JSON export format
type MongoDataModel struct {
MongoID MongoOID `json:"_id"`
ID json.RawMessage `json:"id"`
CtType json.RawMessage `json:"ct_type"`
DiskSize json.RawMessage `json:"disk_size"`
CoreCount json.RawMessage `json:"core_count"`
RamSize json.RawMessage `json:"ram_size"`
OsType string `json:"os_type"`
OsVersion string `json:"os_version"`
DisableIP6 string `json:"disable_ip6"`
NsApp string `json:"nsapp"`
Method string `json:"method"`
CreatedAt json.RawMessage `json:"created_at"`
PveVersion string `json:"pveversion"`
Status string `json:"status"`
RandomID string `json:"random_id"`
Type string `json:"type"`
Error *string `json:"error"`
}
// parseMongoInt extracts int from MongoDB $numberLong or plain number
func parseMongoInt(raw json.RawMessage) int {
if len(raw) == 0 {
return 0
}
// Try $numberLong first
var numLong MongoNumberLong
if err := json.Unmarshal(raw, &numLong); err == nil && numLong.Value != "" {
if n, err := strconv.Atoi(numLong.Value); err == nil {
return n
}
}
// Try plain number
var n int
if err := json.Unmarshal(raw, &n); err == nil {
return n
}
return 0
}
// parseMongoDate extracts date string from MongoDB $date or plain string
func parseMongoDate(raw json.RawMessage) string {
if len(raw) == 0 {
return ""
}
// Try $date first
var mongoDate MongoDate
if err := json.Unmarshal(raw, &mongoDate); err == nil && mongoDate.Value != "" {
return mongoDate.Value
}
// Try plain string
var s string
if err := json.Unmarshal(raw, &s); err == nil {
return s
}
return ""
}
// parseMongoString extracts string from MongoDB $numberLong or plain string/number
func parseMongoString(raw json.RawMessage) string {
if len(raw) == 0 {
return ""
}
// Try $numberLong first (MongoDB exports IDs as numbers sometimes)
var numLong MongoNumberLong
if err := json.Unmarshal(raw, &numLong); err == nil && numLong.Value != "" {
return numLong.Value
}
// Try plain string
var s string
if err := json.Unmarshal(raw, &s); err == nil {
return s
}
// Try plain number
var n int64
if err := json.Unmarshal(raw, &n); err == nil {
return strconv.FormatInt(n, 10)
}
return ""
}
// convertMongoToOld converts MongoDB format to OldDataModel
func convertMongoToOld(m MongoDataModel) OldDataModel {
errorStr := ""
if m.Error != nil {
errorStr = *m.Error
}
return OldDataModel{
CtType: parseMongoInt(m.CtType),
DiskSize: parseMongoInt(m.DiskSize),
CoreCount: parseMongoInt(m.CoreCount),
RamSize: parseMongoInt(m.RamSize),
OsType: m.OsType,
OsVersion: m.OsVersion,
DisableIP6: m.DisableIP6,
NsApp: m.NsApp,
Method: m.Method,
CreatedAt: parseMongoDate(m.CreatedAt),
PveVersion: m.PveVersion,
Status: m.Status,
RandomID: m.RandomID,
Type: m.Type,
Error: errorStr,
}
}
// PBRecord represents the PocketBase record format
type PBRecord struct {
CtType int `json:"ct_type"`
DiskSize int `json:"disk_size"`
CoreCount int `json:"core_count"`
RamSize int `json:"ram_size"`
OsType string `json:"os_type"`
OsVersion string `json:"os_version"`
DisableIP6 string `json:"disableip6"`
NsApp string `json:"nsapp"`
Method string `json:"method"`
PveVersion string `json:"pve_version"`
Status string `json:"status"`
RandomID string `json:"random_id"`
Type string `json:"type"`
Error string `json:"error"`
RepoSource string `json:"repo_source,omitempty"`
OldCreated string `json:"old_created,omitempty"`
}
type Summary struct {
TotalEntries int `json:"total_entries"`
}
type ImportJob struct {
Record OldDataModel
Index int
}
type ImportResult struct {
Success bool
Skipped bool
Error error
}
func main() {
// Setup source URLs
baseURL := os.Getenv("MIGRATION_SOURCE_URL")
if baseURL == "" {
baseURL = defaultSourceAPI
}
sourceAPI = baseURL + "/paginated"
summaryAPI = baseURL + "/summary"
// Repo source (to distinguish data origins)
repoSource = os.Getenv("REPO_SOURCE")
// Date filters
if dateStr := os.Getenv("DATE_UNTIL"); dateStr != "" {
parsed, err := time.Parse("2006-01-02", dateStr)
if err != nil {
fmt.Printf("[ERROR] Invalid DATE_UNTIL format (use YYYY-MM-DD): %v\n", err)
os.Exit(1)
}
dateUntil = parsed.Add(24*time.Hour - time.Second) // End of day
hasDateUntil = true
}
if dateStr := os.Getenv("DATE_FROM"); dateStr != "" {
parsed, err := time.Parse("2006-01-02", dateStr)
if err != nil {
fmt.Printf("[ERROR] Invalid DATE_FROM format (use YYYY-MM-DD): %v\n", err)
os.Exit(1)
}
dateFrom = parsed // Start of day
hasDateFrom = true
}
// Batch size
batchSize := defaultBatchSize
if bs := os.Getenv("BATCH_SIZE"); bs != "" {
if n, err := strconv.Atoi(bs); err == nil && n > 0 {
batchSize = n
}
}
// Start page (for resuming)
startPage := 1
if sp := os.Getenv("START_PAGE"); sp != "" {
if n, err := strconv.Atoi(sp); err == nil && n > 0 {
startPage = n
}
}
// PocketBase URL
pbURL := os.Getenv("POCKETBASE_URL")
if pbURL == "" {
pbURL = os.Getenv("PB_URL")
}
if pbURL == "" {
pbURL = defaultPBURL
}
// Collection
pbCollection := os.Getenv("POCKETBASE_COLLECTION")
if pbCollection == "" {
pbCollection = os.Getenv("PB_TARGET_COLLECTION")
}
if pbCollection == "" {
pbCollection = "telemetry"
}
// Auth collection
authCollection := os.Getenv("PB_AUTH_COLLECTION")
if authCollection == "" {
authCollection = "telemetry_service_user"
}
// Credentials
pbIdentity := os.Getenv("PB_IDENTITY")
pbPassword := os.Getenv("PB_PASSWORD")
// Workers count
if wc := os.Getenv("WORKERS"); wc != "" {
if n, err := strconv.Atoi(wc); err == nil && n > 0 {
workerCount = n
}
}
// Check for SQL output mode (fastest - no network!)
jsonFile := os.Getenv("JSON_FILE")
sqlOutput := os.Getenv("SQL_OUTPUT")
if jsonFile != "" && sqlOutput != "" {
runSQLExport(jsonFile, sqlOutput, pbCollection)
return
}
// Check for JSON file import mode (via HTTP API)
if jsonFile != "" {
// Authenticate first
if pbIdentity != "" && pbPassword != "" {
fmt.Println("[AUTH] Authenticating with PocketBase...")
err := authenticate(pbURL, authCollection, pbIdentity, pbPassword)
if err != nil {
fmt.Printf("[ERROR] Authentication failed: %v\n", err)
os.Exit(1)
}
fmt.Println("[OK] Authentication successful")
}
// Skip records (for resuming)
var skipRecords int64
if sr := os.Getenv("SKIP_RECORDS"); sr != "" {
if n, err := strconv.ParseInt(sr, 10, 64); err == nil && n > 0 {
skipRecords = n
}
}
runJSONFileImport(jsonFile, pbURL, pbCollection, batchSize, skipRecords)
return
}
fmt.Println("=========================================================")
fmt.Println(" Data Migration to PocketBase")
fmt.Println("=========================================================")
fmt.Printf("Source API: %s\n", baseURL)
fmt.Printf("PocketBase URL: %s\n", pbURL)
fmt.Printf("Collection: %s\n", pbCollection)
fmt.Printf("Batch Size: %d\n", batchSize)
fmt.Printf("Workers: %d\n", workerCount)
if repoSource != "" {
fmt.Printf("Repo Source: %s\n", repoSource)
}
if hasDateFrom {
fmt.Printf("Date From: >= %s\n", dateFrom.Format("2006-01-02"))
}
if hasDateUntil {
fmt.Printf("Date Until: <= %s\n", dateUntil.Format("2006-01-02"))
}
if startPage > 1 {
fmt.Printf("Starting Page: %d\n", startPage)
}
fmt.Println("---------------------------------------------------------")
// Authenticate with PocketBase
if pbIdentity != "" && pbPassword != "" {
fmt.Println("[AUTH] Authenticating with PocketBase...")
err := authenticate(pbURL, authCollection, pbIdentity, pbPassword)
if err != nil {
fmt.Printf("[ERROR] Authentication failed: %v\n", err)
os.Exit(1)
}
fmt.Println("[OK] Authentication successful")
} else {
fmt.Println("[WARN] No credentials provided, trying without auth...")
}
fmt.Println("---------------------------------------------------------")
// Get total count
summary, err := getSummary()
if err != nil {
fmt.Printf("[ERROR] Failed to get summary: %v\n", err)
os.Exit(1)
}
fmt.Printf("[INFO] Total entries in source: %d\n", summary.TotalEntries)
fmt.Println("---------------------------------------------------------")
// Calculate pages
totalPages := (summary.TotalEntries + batchSize - 1) / batchSize
fmt.Printf("Starting migration (%d pages to process)...\n", totalPages)
fmt.Println()
var totalMigrated, totalFailed, totalSkipped, totalFiltered int64
startTime := time.Now()
// Progress tracking
processedRecords := int64(0)
for page := startPage; page <= totalPages; page++ {
pageStart := time.Now()
data, err := fetchPage(page, batchSize)
if err != nil {
fmt.Printf("[Page %d] ERROR: Failed to fetch: %v\n", page, err)
atomic.AddInt64(&totalFailed, int64(batchSize))
continue
}
// Filter by date if needed
var filteredData []OldDataModel
var lastTimestamp string
pageFiltered := 0
for _, record := range data {
recordDate, err := parseTimestamp(record.CreatedAt)
if err != nil {
// Skip records with unparseable dates
atomic.AddInt64(&totalFiltered, 1)
pageFiltered++
continue
}
// Check DATE_FROM (skip if before)
if hasDateFrom && recordDate.Before(dateFrom) {
atomic.AddInt64(&totalFiltered, 1)
pageFiltered++
continue
}
// Check DATE_UNTIL (skip if after)
if hasDateUntil && recordDate.After(dateUntil) {
atomic.AddInt64(&totalFiltered, 1)
pageFiltered++
continue
}
filteredData = append(filteredData, record)
lastTimestamp = record.CreatedAt
}
atomic.AddInt64(&processedRecords, int64(len(data)))
// Show progress even if all filtered
if len(filteredData) == 0 {
if page <= 5 || page%100 == 0 {
fmt.Printf("[Page %d/%d] All %d filtered (after DATE_UNTIL) | Total filtered: %d\n",
page, totalPages, pageFiltered, atomic.LoadInt64(&totalFiltered))
}
continue
}
// Worker pool for importing
jobs := make(chan ImportJob, len(filteredData))
results := make(chan ImportResult, len(filteredData))
var wg sync.WaitGroup
// Start workers
for w := 0; w < workerCount; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobs {
result := importRecordWithRetry(pbURL, pbCollection, job.Record, 3)
results <- result
}
}()
}
// Send jobs
for i, record := range filteredData {
jobs <- ImportJob{Record: record, Index: i}
}
close(jobs)
// Wait for workers to finish
go func() {
wg.Wait()
close(results)
}()
// Collect results
pageMigrated, pageSkipped, pageFailed := 0, 0, 0
var lastError error
for result := range results {
if result.Skipped {
pageSkipped++
} else if result.Success {
pageMigrated++
} else {
pageFailed++
if lastError == nil && result.Error != nil {
lastError = result.Error
}
}
}
// Log first error on the page
if pageFailed > 0 && lastError != nil && page <= 3 {
fmt.Printf("[Page %d] Sample error: %v\n", page, lastError)
}
atomic.AddInt64(&totalMigrated, int64(pageMigrated))
atomic.AddInt64(&totalSkipped, int64(pageSkipped))
atomic.AddInt64(&totalFailed, int64(pageFailed))
// Progress display (first 5 pages, then every 10 pages)
if page <= 5 || page%10 == 0 || page == totalPages {
elapsed := time.Since(startTime)
processed := atomic.LoadInt64(&processedRecords)
rate := float64(processed) / elapsed.Seconds()
remaining := float64(summary.TotalEntries-int(processed)) / rate
eta := time.Duration(remaining) * time.Second
// Format last timestamp for display
lastDate := ""
if lastTimestamp != "" {
if t, err := parseTimestamp(lastTimestamp); err == nil {
lastDate = t.Format("2006-01-02")
}
}
fmt.Printf("[Page %d/%d] Migrated: %d | Skipped: %d | Failed: %d | Filtered: %d | %.0f rec/s | Last: %s | ETA: %s\n",
page, totalPages,
atomic.LoadInt64(&totalMigrated),
atomic.LoadInt64(&totalSkipped),
atomic.LoadInt64(&totalFailed),
atomic.LoadInt64(&totalFiltered),
rate,
lastDate,
formatDuration(eta))
}
// Adaptive delay based on page processing time
pageTime := time.Since(pageStart)
if pageTime < 500*time.Millisecond {
time.Sleep(100 * time.Millisecond)
}
}
fmt.Println()
fmt.Println("=========================================================")
fmt.Println(" Migration Complete")
fmt.Println("=========================================================")
fmt.Printf("Successfully migrated: %d\n", atomic.LoadInt64(&totalMigrated))
fmt.Printf("Skipped (duplicates): %d\n", atomic.LoadInt64(&totalSkipped))
fmt.Printf("Filtered (date): %d\n", atomic.LoadInt64(&totalFiltered))
fmt.Printf("Failed: %d\n", atomic.LoadInt64(&totalFailed))
fmt.Printf("Duration: %s\n", formatDuration(time.Since(startTime)))
fmt.Println("=========================================================")
if atomic.LoadInt64(&totalMigrated) > 0 {
fmt.Println()
fmt.Println("Next steps for timestamp migration:")
fmt.Printf(" 1. SSH into your PocketBase server\n")
fmt.Printf(" 2. Run: sqlite3 /app/pb_data/data.db \".tables\"\n")
fmt.Printf(" 3. Find your collection table name\n")
fmt.Printf(" 4. Run: sqlite3 /app/pb_data/data.db \"UPDATE <table_name> SET created = old_created, updated = old_created WHERE old_created IS NOT NULL AND old_created != ''\"\n")
fmt.Printf(" 5. Remove the old_created field from the collection in PocketBase Admin UI\n")
}
}
func formatDuration(d time.Duration) string {
if d < 0 {
return "calculating..."
}
h := d / time.Hour
d -= h * time.Hour
m := d / time.Minute
d -= m * time.Minute
s := d / time.Second
if h > 0 {
return fmt.Sprintf("%dh %dm %ds", h, m, s)
}
if m > 0 {
return fmt.Sprintf("%dm %ds", m, s)
}
return fmt.Sprintf("%ds", s)
}
func parseTimestamp(ts string) (time.Time, error) {
if ts == "" {
return time.Time{}, fmt.Errorf("empty timestamp")
}
formats := []string{
time.RFC3339,
time.RFC3339Nano,
"2006-01-02T15:04:05.000Z",
"2006-01-02T15:04:05Z",
"2006-01-02T15:04:05",
"2006-01-02 15:04:05",
"2006-01-02 15:04:05.000",
"2006-01-02 15:04:05.000 UTC",
"2006-01-02T15:04:05.000+00:00",
}
for _, format := range formats {
if t, err := time.Parse(format, ts); err == nil {
return t, nil
}
}
return time.Time{}, fmt.Errorf("cannot parse: %s", ts)
}
func convertTimestamp(ts string) string {
if ts == "" {
return ""
}
t, err := parseTimestamp(ts)
if err != nil {
return ""
}
return t.UTC().Format("2006-01-02 15:04:05.000Z")
}
func getSummary() (*Summary, error) {
resp, err := http.Get(summaryAPI)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var summary Summary
if err := json.NewDecoder(resp.Body).Decode(&summary); err != nil {
return nil, err
}
return &summary, nil
}
func authenticate(pbURL, authCollection, identity, password string) error {
body := map[string]string{
"identity": identity,
"password": password,
}
jsonData, _ := json.Marshal(body)
url := fmt.Sprintf("%s/api/collections/%s/auth-with-password", pbURL, authCollection)
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body))
}
var result struct {
Token string `json:"token"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return err
}
if result.Token == "" {
return fmt.Errorf("no token in response")
}
authToken = result.Token
return nil
}
func fetchPage(page, limit int) ([]OldDataModel, error) {
url := fmt.Sprintf("%s?page=%d&limit=%d", sourceAPI, page, limit)
client := &http.Client{Timeout: 60 * time.Second}
resp, err := client.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body))
}
var data []OldDataModel
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
return nil, err
}
return data, nil
}
func importRecordWithRetry(pbURL, collection string, old OldDataModel, maxRetries int) ImportResult {
var lastErr error
for i := 0; i < maxRetries; i++ {
err := importRecord(pbURL, collection, old)
if err == nil {
return ImportResult{Success: true}
}
if isUniqueViolation(err) {
return ImportResult{Skipped: true}
}
lastErr = err
time.Sleep(time.Duration(i+1) * 100 * time.Millisecond)
}
return ImportResult{Error: lastErr}
}
func importRecord(pbURL, collection string, old OldDataModel) error {
// Map status: "done" -> "success"
status := old.Status
switch status {
case "done":
status = "success"
case "installing", "failed", "unknown", "success":
// keep as-is
default:
status = "unknown"
}
// Map ct_type: 1=unprivileged (0), 2=privileged (1)
// PocketBase schema: 0 = unprivileged, 1 = privileged
ctType := 0 // default: unprivileged
if old.CtType == 2 {
ctType = 1 // privileged
}
// Ensure type is set
recordType := old.Type
if recordType == "" {
recordType = "lxc"
}
// Ensure nsapp is set (required field)
nsapp := old.NsApp
if nsapp == "" {
nsapp = "unknown"
}
record := PBRecord{
CtType: ctType,
DiskSize: old.DiskSize,
CoreCount: old.CoreCount,
RamSize: old.RamSize,
OsType: old.OsType,
OsVersion: old.OsVersion,
DisableIP6: old.DisableIP6,
NsApp: nsapp,
Method: old.Method,
PveVersion: old.PveVersion,
Status: status,
RandomID: old.RandomID,
Type: recordType,
Error: old.Error,
RepoSource: repoSource,
OldCreated: convertTimestamp(old.CreatedAt),
}
jsonData, err := json.Marshal(record)
if err != nil {
return err
}
url := fmt.Sprintf("%s/api/collections/%s/records", pbURL, collection)
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
if authToken != "" {
req.Header.Set("Authorization", "Bearer "+authToken)
}
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body))
}
return nil
}
func isUniqueViolation(err error) bool {
if err == nil {
return false
}
errStr := err.Error()
return strings.Contains(errStr, "UNIQUE constraint failed") ||
strings.Contains(errStr, "duplicate") ||
strings.Contains(errStr, "already exists") ||
strings.Contains(errStr, "validation_not_unique")
}
// runJSONFileImport handles importing from a JSON file using streaming
func runJSONFileImport(jsonFile, pbURL, pbCollection string, batchSize int, skipRecords int64) {
fmt.Println("=========================================================")
fmt.Println(" JSON File Import to PocketBase")
fmt.Println("=========================================================")
fmt.Printf("JSON File: %s\n", jsonFile)
fmt.Printf("PocketBase URL: %s\n", pbURL)
fmt.Printf("Collection: %s\n", pbCollection)
fmt.Printf("Batch Size: %d\n", batchSize)
fmt.Printf("Workers: %d\n", workerCount)
if repoSource != "" {
fmt.Printf("Repo Source: %s\n", repoSource)
}
if hasDateFrom {
fmt.Printf("Date From: >= %s\n", dateFrom.Format("2006-01-02"))
}
if hasDateUntil {
fmt.Printf("Date Until: <= %s\n", dateUntil.Format("2006-01-02"))
}
if skipRecords > 0 {
fmt.Printf("Skip Records: %d\n", skipRecords)
}
fmt.Println("---------------------------------------------------------")
// Open file
file, err := os.Open(jsonFile)
if err != nil {
fmt.Printf("[ERROR] Cannot open file: %v\n", err)
os.Exit(1)
}
defer file.Close()
// Get file size for progress
fileInfo, _ := file.Stat()
fileSize := fileInfo.Size()
fmt.Printf("[INFO] File size: %.2f GB\n", float64(fileSize)/(1024*1024*1024))
// Auto-detect MongoDB Extended JSON format by peeking at the first 2KB
peekBuf := make([]byte, 2048)
n, _ := file.Read(peekBuf)
peekStr := string(peekBuf[:n])
isMongoFormat := strings.Contains(peekStr, `"$oid"`) ||
strings.Contains(peekStr, `"$numberLong"`) ||
strings.Contains(peekStr, `"$date"`)
// Reset file to beginning
file.Seek(0, 0)
if isMongoFormat {
fmt.Println("[INFO] Detected MongoDB Extended JSON format")
} else {
fmt.Println("[INFO] Detected standard JSON format")
}
// Use buffered reader for better performance
reader := bufio.NewReaderSize(file, 64*1024*1024) // 64MB buffer
// Create JSON decoder for streaming
decoder := json.NewDecoder(reader)
// Expect array start
token, err := decoder.Token()
if err != nil {
fmt.Printf("[ERROR] Cannot read JSON: %v\n", err)
os.Exit(1)
}
if delim, ok := token.(json.Delim); !ok || delim != '[' {
fmt.Printf("[ERROR] JSON file must be an array (expected '[', got %v)\n", token)
os.Exit(1)
}
fmt.Println("[OK] JSON array detected, starting streaming import...")
fmt.Println("---------------------------------------------------------")
var totalMigrated, totalFailed, totalSkipped, totalFiltered int64
var recordCount int64
startTime := time.Now()
// Batch processing
batch := make([]OldDataModel, 0, batchSize)
batchNum := 0
for decoder.More() {
var record OldDataModel
if isMongoFormat {
// Decode as MongoDB Extended JSON
var mongoRecord MongoDataModel
if err := decoder.Decode(&mongoRecord); err != nil {
fmt.Printf("[WARN] Failed to decode record %d: %v\n", recordCount+1, err)
atomic.AddInt64(&totalFailed, 1)
recordCount++
continue
}
record = convertMongoToOld(mongoRecord)
} else {
// Decode as standard JSON
if err := decoder.Decode(&record); err != nil {
fmt.Printf("[WARN] Failed to decode record %d: %v\n", recordCount+1, err)
atomic.AddInt64(&totalFailed, 1)
recordCount++
continue
}
}
recordCount++
// Skip records (for resuming)
if recordCount <= skipRecords {
if recordCount%100000 == 0 {
fmt.Printf("[SKIP] Skipped %d / %d records...\n", recordCount, skipRecords)
}
continue
}
// Date filter
if hasDateFrom || hasDateUntil {
recordDate, err := parseTimestamp(record.CreatedAt)
if err != nil {
atomic.AddInt64(&totalFiltered, 1)
continue
}
if hasDateFrom && recordDate.Before(dateFrom) {
atomic.AddInt64(&totalFiltered, 1)
continue
}
if hasDateUntil && recordDate.After(dateUntil) {
atomic.AddInt64(&totalFiltered, 1)
continue
}
}
batch = append(batch, record)
// Process batch when full
if len(batch) >= batchSize {
batchNum++
migrated, skipped, failed := processBatch(pbURL, pbCollection, batch, batchNum, recordCount, startTime)
atomic.AddInt64(&totalMigrated, int64(migrated))
atomic.AddInt64(&totalSkipped, int64(skipped))
atomic.AddInt64(&totalFailed, int64(failed))
batch = batch[:0] // Reset batch
}
}
// Process remaining batch
if len(batch) > 0 {
batchNum++
migrated, skipped, failed := processBatch(pbURL, pbCollection, batch, batchNum, recordCount, startTime)
atomic.AddInt64(&totalMigrated, int64(migrated))
atomic.AddInt64(&totalSkipped, int64(skipped))
atomic.AddInt64(&totalFailed, int64(failed))
}
fmt.Println()
fmt.Println("=========================================================")
fmt.Println(" JSON File Import Complete")
fmt.Println("=========================================================")
fmt.Printf("Total records read: %d\n", recordCount)
fmt.Printf("Successfully imported: %d\n", atomic.LoadInt64(&totalMigrated))
fmt.Printf("Skipped (duplicates): %d\n", atomic.LoadInt64(&totalSkipped))
fmt.Printf("Filtered (date): %d\n", atomic.LoadInt64(&totalFiltered))
fmt.Printf("Failed: %d\n", atomic.LoadInt64(&totalFailed))
fmt.Printf("Duration: %s\n", formatDuration(time.Since(startTime)))
fmt.Println("=========================================================")
if atomic.LoadInt64(&totalMigrated) > 0 {
fmt.Println()
fmt.Println("Next steps for timestamp migration:")
fmt.Printf(" 1. SSH into your PocketBase server\n")
fmt.Printf(" 2. Run: sqlite3 /app/pb_data/data.db \".tables\"\n")
fmt.Printf(" 3. Find your collection table name\n")
fmt.Printf(" 4. Run: sqlite3 /app/pb_data/data.db \"UPDATE <table_name> SET created = old_created, updated = old_created WHERE old_created IS NOT NULL AND old_created != ''\"\n")
fmt.Printf(" 5. Remove the old_created field from the collection in PocketBase Admin UI\n")
}
}
func processBatch(pbURL, pbCollection string, records []OldDataModel, batchNum int, totalRead int64, startTime time.Time) (migrated, skipped, failed int) {
batchStart := time.Now()
// Worker pool
jobs := make(chan ImportJob, len(records))
results := make(chan ImportResult, len(records))
var wg sync.WaitGroup
// Start workers
for w := 0; w < workerCount; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobs {
result := importRecordWithRetry(pbURL, pbCollection, job.Record, 3)
results <- result
}
}()
}
// Send jobs
for i, record := range records {
jobs <- ImportJob{Record: record, Index: i}
}
close(jobs)
// Wait and close results
go func() {
wg.Wait()
close(results)
}()
// Collect results
var firstError error
for result := range results {
if result.Skipped {
skipped++
} else if result.Success {
migrated++
} else {
failed++
if firstError == nil && result.Error != nil {
firstError = result.Error
}
}
}
// Progress display
elapsed := time.Since(startTime)
rate := float64(totalRead) / elapsed.Seconds()
var lastDate string
if len(records) > 0 {
if t, err := parseTimestamp(records[len(records)-1].CreatedAt); err == nil {
lastDate = t.Format("2006-01-02")
}
}
fmt.Printf("[Batch %d] Read: %d | Migrated: %d | Skipped: %d | Failed: %d | %.0f rec/s | Last: %s | Batch: %v\n",
batchNum, totalRead, migrated, skipped, failed, rate, lastDate, time.Since(batchStart).Round(time.Millisecond))
// Show first error for debugging
if firstError != nil && batchNum <= 3 {
fmt.Printf(" [ERROR] Sample error: %v\n", firstError)
}
return migrated, skipped, failed
}
// runSQLExport generates a SQL file for direct SQLite import (fastest method!)
func runSQLExport(jsonFile, sqlOutput, tableName string) {
fmt.Println("=========================================================")
fmt.Println(" SQL Export Mode (Direct SQLite Import)")
fmt.Println("=========================================================")
fmt.Printf("JSON File: %s\n", jsonFile)
fmt.Printf("SQL Output: %s\n", sqlOutput)
fmt.Printf("Table: %s\n", tableName)
fmt.Printf("Repo Source: %s\n", repoSource)
fmt.Println("---------------------------------------------------------")
// Open JSON file
file, err := os.Open(jsonFile)
if err != nil {
fmt.Printf("[ERROR] Cannot open JSON file: %v\n", err)
os.Exit(1)
}
defer file.Close()
// Get file size for progress
fileInfo, _ := file.Stat()
fileSize := fileInfo.Size()
fmt.Printf("[INFO] File size: %.2f GB\n", float64(fileSize)/(1024*1024*1024))
// Open SQL output file stream
sqlFile, err := os.Create(sqlOutput)
if err != nil {
fmt.Printf("[ERROR] Cannot create SQL file: %v\n", err)
os.Exit(1)
}
defer sqlFile.Close()
writer := bufio.NewWriterSize(sqlFile, 1*1024*1024) // 1MB buffer for faster feedback
// Write SQL header
writer.WriteString("-- Auto-generated SQL import for PocketBase\n")
writer.WriteString("-- Generated: " + time.Now().Format(time.RFC3339) + "\n")
writer.WriteString("PRAGMA journal_mode=WAL;\n")
writer.WriteString("PRAGMA synchronous=OFF;\n")
writer.WriteString("PRAGMA cache_size=100000;\n")
writer.WriteString("BEGIN TRANSACTION;\n\n")
writer.Flush() // Flush header immediately
fmt.Println("[INFO] SQL header written, starting record processing...")
startTime := time.Now()
var recordCount int64
var filteredCount int64
var skippedCount int64
// Create decoder directly from file (not buffered reader)
decoder := json.NewDecoder(file)
// Read opening bracket of array
fmt.Println("[INFO] Reading JSON array...")
token, err := decoder.Token()
if err != nil {
fmt.Printf("[ERROR] Cannot read JSON: %v\n", err)
os.Exit(1)
}
if delim, ok := token.(json.Delim); !ok || delim != '[' {
fmt.Printf("[ERROR] Expected JSON array, got: %v\n", token)
os.Exit(1)
}
fmt.Println("[INFO] Found JSON array, starting decode...")
// Decode each element
for decoder.More() {
var mongoRecord MongoDataModel
if err := decoder.Decode(&mongoRecord); err != nil {
if err == io.EOF {
break
}
skippedCount++
// Log first few errors for debugging
if skippedCount <= 5 {
fmt.Printf("[WARN] Skipping malformed record #%d: %v\n", skippedCount, err)
}
continue
}
// Convert MongoDB format to normal format
record := OldDataModel{
ID: parseMongoString(mongoRecord.ID),
CtType: parseMongoInt(mongoRecord.CtType),
DiskSize: parseMongoInt(mongoRecord.DiskSize),
CoreCount: parseMongoInt(mongoRecord.CoreCount),
RamSize: parseMongoInt(mongoRecord.RamSize),
OsType: mongoRecord.OsType,
OsVersion: mongoRecord.OsVersion,
DisableIP6: mongoRecord.DisableIP6,
NsApp: mongoRecord.NsApp,
Method: mongoRecord.Method,
CreatedAt: parseMongoDate(mongoRecord.CreatedAt),
PveVersion: mongoRecord.PveVersion,
Status: mongoRecord.Status,
RandomID: mongoRecord.RandomID,
Type: mongoRecord.Type,
}
if mongoRecord.Error != nil {
record.Error = *mongoRecord.Error
}
// Apply date filters
if hasDateFrom || hasDateUntil {
recordTime, err := parseTimestamp(record.CreatedAt)
if err == nil {
if hasDateFrom && recordTime.Before(dateFrom) {
filteredCount++
continue
}
if hasDateUntil && recordTime.After(dateUntil) {
filteredCount++
continue
}
}
}
// Generate unique ID for PocketBase
pbID := generatePocketBaseID()
// Normalize values
status := record.Status
if status == "done" {
status = "success"
}
if status == "" {
status = "unknown"
}
nsapp := record.NsApp
if nsapp == "" {
nsapp = "unknown"
}
recType := record.Type
if recType == "" {
recType = "lxc"
}
// Format created date
createdAt := record.CreatedAt
if createdAt == "" {
createdAt = time.Now().UTC().Format("2006-01-02 15:04:05.000Z")
}
// Escape strings for SQL
escapeSQLString := func(s string) string {
return strings.ReplaceAll(s, "'", "''")
}
// Write INSERT statement (disableip6 removed - column no longer exists)
sql := fmt.Sprintf(
"INSERT OR IGNORE INTO %s (id,created,updated,ct_type,disk_size,core_count,ram_size,os_type,os_version,nsapp,method,pve_version,status,random_id,type,error,repo_source) VALUES ('%s','%s','%s',%d,%d,%d,%d,'%s','%s','%s','%s','%s','%s','%s','%s','%s','%s');\n",
tableName,
pbID,
escapeSQLString(createdAt),
escapeSQLString(createdAt),
record.CtType,
record.DiskSize,
record.CoreCount,
record.RamSize,
escapeSQLString(record.OsType),
escapeSQLString(record.OsVersion),
escapeSQLString(nsapp),
escapeSQLString(record.Method),
escapeSQLString(record.PveVersion),
escapeSQLString(status),
escapeSQLString(record.RandomID),
escapeSQLString(recType),
escapeSQLString(record.Error),
escapeSQLString(repoSource),
)
writer.WriteString(sql)
recordCount++
// Progress every 10k records (and flush to show file growing)
if recordCount%10000 == 0 {
writer.Flush()
elapsed := time.Since(startTime)
rate := float64(recordCount) / elapsed.Seconds()
fmt.Printf("[PROGRESS] %d records processed (%.0f rec/s)\n", recordCount, rate)
}
}
// Write footer
writer.WriteString("\nCOMMIT;\n")
writer.Flush()
elapsed := time.Since(startTime)
rate := float64(recordCount) / elapsed.Seconds()
fmt.Println()
fmt.Println("=========================================================")
fmt.Println(" SQL Export Complete")
fmt.Println("=========================================================")
fmt.Printf("Records exported: %d\n", recordCount)
fmt.Printf("Skipped (errors): %d\n", skippedCount)
fmt.Printf("Filtered (date): %d\n", filteredCount)
fmt.Printf("Duration: %s\n", formatDuration(elapsed))
fmt.Printf("Speed: %.0f records/sec\n", rate)
fmt.Printf("Output file: %s\n", sqlOutput)
fmt.Println("---------------------------------------------------------")
fmt.Println()
fmt.Println("To import into PocketBase:")
fmt.Printf(" sqlite3 /app/pb_data/data.db < %s\n", sqlOutput)
fmt.Println()
}
// generatePocketBaseID generates a 15-char alphanumeric ID like PocketBase does
func generatePocketBaseID() string {
const chars = "abcdefghijklmnopqrstuvwxyz0123456789"
b := make([]byte, 15)
// Use crypto/rand for randomness
randBytes := make([]byte, 15)
if _, err := io.ReadFull(cryptoRand.Reader, randBytes); err != nil {
// Fallback to time-based
for i := range b {
b[i] = chars[(time.Now().UnixNano()+int64(i))%int64(len(chars))]
}
return string(b)
}
for i := range b {
b[i] = chars[randBytes[i]%byte(len(chars))]
}
return string(b)
}