package proxmox import ( "crypto/tls" "encoding/json" "errors" "fmt" "net" "net/http" "sync" "time" "github.com/patrickmn/go-cache" log "github.com/sirupsen/logrus" ) // Multipoint API HTTP client will make requests in round-robin fashion on all endpoints which are reachable. // The client will periodically check endpoints for liveness. type ApiClient struct { endpoints []*ApiEndpoint // API endpoints array. tokenId string // API token ID. secret string // API token secret. httpClient *http.Client // Internal HTTP client. checkInterval time.Duration // Check interval how often will APi client look for endpoints liveness. stop chan struct{} // Stop channel which is used in ticker. mutex sync.Mutex // Stop mutex to prevent requests making requests on unitialized API client instance. stopped bool // state of API client. index int // Round robin next index. aliveCount int // Count of alive endpoints. cache *cache.Cache // Cache instance. } // PVE API endpoint. type ApiEndpoint struct { host string // Endpoint address. alive bool // Endpoint liveness state. } // PVE API response. type ApiResponse struct { Data json.RawMessage `json:"data"` // Data message from PVE API. } // Create new instance of API HTTP client. func NewApiClient(endpoints []string, tokenId string, secret string, checkInterval time.Duration) *ApiClient { instance := ApiClient{ checkInterval: checkInterval, tokenId: tokenId, secret: secret, } // Cache initialization. instance.cache = cache.New(30*time.Second, 5*time.Second) // Prepare API endpoints. for _, endpoint := range endpoints { apiEndpoint := ApiEndpoint{ host: endpoint, alive: false, } instance.endpoints = append(instance.endpoints, &apiEndpoint) } // Configure HTTP transport. transport := &http.Transport{ Dial: (&net.Dialer{ Timeout: 30 * time.Second, KeepAlive: 60 * time.Second, DualStack: true, }).Dial, TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, DisableCompression: false, MaxIdleConns: 4, MaxIdleConnsPerHost: 4, IdleConnTimeout: 90 * time.Second, TLSHandshakeTimeout: 10 * time.Second, ExpectContinueTimeout: 1 * time.Second, } // Create instance of HTTP client with modified HTTP transport. instance.httpClient = &http.Client{ Timeout: time.Second * 10, Transport: transport, } return &instance } // Check endpoint liveness state. func (instance *ApiClient) checkEndpointsLiveness() { // We want to make sure other routines won't make any requests until we have checked for alive connections. instance.mutex.Lock() defer instance.mutex.Unlock() prevAliveCount := instance.aliveCount instance.aliveCount = 0 for _, endpoint := range instance.endpoints { endpoint.alive = false url := fmt.Sprintf("%s%s", endpoint.host, "api2/json/") req, err := http.NewRequest("GET", url, nil) if err != nil { log.Errorf("Liveness check of host %s failed. Error: %s. ", endpoint.host, err) continue } // Authentication token. req.Header.Set("Authorization", fmt.Sprintf("PVEAPIToken=%s=%s", instance.tokenId, instance.secret)) resp, err := instance.httpClient.Do(req) if err != nil { log.Debugf("Liveness check of host %s failed. Error: %s. ", endpoint.host, err) continue } if resp.StatusCode == 401 { log.Errorf("Liveness check of host %s failed. Error: Authentication failed.", endpoint.host) continue } if resp.StatusCode != 200 { log.Errorf("Liveness check of host %s failed. Status code: %d. ", endpoint.host, resp.StatusCode) continue } instance.aliveCount++ endpoint.alive = true } if prevAliveCount != instance.aliveCount { log.Infof("Checked Proxmox API hosts. Status: (%d/%d) hosts are UP.", instance.aliveCount, len(instance.endpoints)) } } // Return one alive endpoint Round-Robin. func (instance *ApiClient) getAliveEndpoint() (*ApiEndpoint, error) { var alive []*ApiEndpoint // Prepare only alive endpoints. for _, endpoint := range instance.endpoints { if endpoint.alive { alive = append(alive, endpoint) } } length := len(alive) // If there are no alive endpoints then return error. if length == 0 { return nil, errors.New("All API endpoints are unreachable.") } next := alive[instance.index%length] instance.index = (instance.index + 1) % length return next, nil } // Check if API client is stopped. func (instance *ApiClient) PerformRequest(method string, path string, params map[string]string) (*ApiResponse, error) { instance.mutex.Lock() defer instance.mutex.Unlock() response, found := instance.cache.Get(method + path) if found { r := response.(ApiResponse) return &r, nil } endpoint, err := instance.getAliveEndpoint() if err != nil { return nil, err } url := fmt.Sprintf("%sapi2/json%s", endpoint.host, path) req, err := http.NewRequest(method, url, nil) if err != nil { return nil, err } // Authentication token. req.Header.Set("Authorization", fmt.Sprintf("PVEAPIToken=%s=%s", instance.tokenId, instance.secret)) q := req.URL.Query() for key, element := range params { q.Add(key, element) } req.URL.RawQuery = q.Encode() req.Header.Set("Connection", "Keep-Alive") req.Header.Set("User-Agent", "Proxmox exporter") var maxAttempts int = 3 var attempts int = 0 var resp *http.Response for attempts <= maxAttempts { resp, err = instance.httpClient.Do(req) // If request is sucessful, then break the loop. if err == nil { break } // In case the request failed it will be attempted again in another loop iteration after second + attempts made. if err != nil && attempts < maxAttempts { log.Warnf("Request '%s' failed (%s). Attempting again in %d seconds.", url, err.Error(), attempts+1) time.Sleep(time.Duration(attempts+1) * time.Second) continue } // If all attempts fail then return the error. if err != nil && attempts == maxAttempts { return nil, err } attempts++ } defer resp.Body.Close() // Decode JSON to ApiResponse struct. result := ApiResponse{} err = json.NewDecoder(resp.Body).Decode(&result) if err != nil { return nil, err } else { instance.cache.Set(method+path, result, time.Second*5) return &result, nil } } // Check if API client is stopped. func (instance *ApiClient) IsStopped() bool { instance.mutex.Lock() defer instance.mutex.Unlock() return instance.stopped } // Start the API client. func (instance *ApiClient) Start() { ticker := time.NewTicker(instance.checkInterval) instance.checkEndpointsLiveness() go func() { for { select { case <-ticker.C: instance.checkEndpointsLiveness() case <-instance.stop: instance.mutex.Lock() defer instance.mutex.Unlock() instance.stopped = true return } } }() } // Close connections and stop the API client. func (instance *ApiClient) Stop() { instance.httpClient.CloseIdleConnections() close(instance.stop) }