Files
pve-exporter/proxmox/api_client.go
Jan Lošťák a1ab163804 Initial commit
2024-05-27 21:27:07 +02:00

258 lines
6.9 KiB
Go

package proxmox
import (
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
"sync"
"time"
"github.com/patrickmn/go-cache"
log "github.com/sirupsen/logrus"
)
// Multipoint API HTTP client will make requests in round-robin fashion on all endpoints which are reachable.
// The client will periodically check endpoints for liveness.
type ApiClient struct {
endpoints []*ApiEndpoint // API endpoints array.
tokenId string // API token ID.
secret string // API token secret.
httpClient *http.Client // Internal HTTP client.
checkInterval time.Duration // Check interval how often will APi client look for endpoints liveness.
stop chan struct{} // Stop channel which is used in ticker.
mutex sync.Mutex // Stop mutex to prevent requests making requests on unitialized API client instance.
stopped bool // state of API client.
index int // Round robin next index.
aliveCount int // Count of alive endpoints.
cache *cache.Cache // Cache instance.
}
// PVE API endpoint.
type ApiEndpoint struct {
host string // Endpoint address.
alive bool // Endpoint liveness state.
}
// PVE API response.
type ApiResponse struct {
Data json.RawMessage `json:"data"` // Data message from PVE API.
}
// Create new instance of API HTTP client.
func NewApiClient(endpoints []string, tokenId string, secret string, checkInterval time.Duration) *ApiClient {
instance := ApiClient{
checkInterval: checkInterval,
tokenId: tokenId,
secret: secret,
}
// Cache initialization.
instance.cache = cache.New(30*time.Second, 5*time.Second)
// Prepare API endpoints.
for _, endpoint := range endpoints {
apiEndpoint := ApiEndpoint{
host: endpoint,
alive: false,
}
instance.endpoints = append(instance.endpoints, &apiEndpoint)
}
// Configure HTTP transport.
transport := &http.Transport{
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 60 * time.Second,
DualStack: true,
}).Dial,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
DisableCompression: false,
MaxIdleConns: 4,
MaxIdleConnsPerHost: 4,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
// Create instance of HTTP client with modified HTTP transport.
instance.httpClient = &http.Client{
Timeout: time.Second * 10,
Transport: transport,
}
return &instance
}
// Check endpoint liveness state.
func (instance *ApiClient) checkEndpointsLiveness() {
// We want to make sure other routines won't make any requests until we have checked for alive connections.
instance.mutex.Lock()
defer instance.mutex.Unlock()
prevAliveCount := instance.aliveCount
instance.aliveCount = 0
for _, endpoint := range instance.endpoints {
endpoint.alive = false
url := fmt.Sprintf("%s%s", endpoint.host, "api2/json/")
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Errorf("Liveness check of host %s failed. Error: %s. ", endpoint.host, err)
continue
}
// Authentication token.
req.Header.Set("Authorization", fmt.Sprintf("PVEAPIToken=%s=%s", instance.tokenId, instance.secret))
resp, err := instance.httpClient.Do(req)
if err != nil {
log.Debugf("Liveness check of host %s failed. Error: %s. ", endpoint.host, err)
continue
}
if resp.StatusCode == 401 {
log.Errorf("Liveness check of host %s failed. Error: Authentication failed.", endpoint.host)
continue
}
if resp.StatusCode != 200 {
log.Errorf("Liveness check of host %s failed. Status code: %d. ", endpoint.host, resp.StatusCode)
continue
}
instance.aliveCount++
endpoint.alive = true
}
if prevAliveCount != instance.aliveCount {
log.Infof("Checked Proxmox API hosts. Status: (%d/%d) hosts are UP.", instance.aliveCount, len(instance.endpoints))
}
}
// Return one alive endpoint Round-Robin.
func (instance *ApiClient) getAliveEndpoint() (*ApiEndpoint, error) {
var alive []*ApiEndpoint
// Prepare only alive endpoints.
for _, endpoint := range instance.endpoints {
if endpoint.alive {
alive = append(alive, endpoint)
}
}
length := len(alive)
// If there are no alive endpoints then return error.
if length == 0 {
return nil, errors.New("All API endpoints are unreachable.")
}
next := alive[instance.index%length]
instance.index = (instance.index + 1) % length
return next, nil
}
// Check if API client is stopped.
func (instance *ApiClient) PerformRequest(method string, path string, params map[string]string) (*ApiResponse, error) {
instance.mutex.Lock()
defer instance.mutex.Unlock()
response, found := instance.cache.Get(method + path)
if found {
r := response.(ApiResponse)
return &r, nil
}
endpoint, err := instance.getAliveEndpoint()
if err != nil {
return nil, err
}
url := fmt.Sprintf("%sapi2/json%s", endpoint.host, path)
req, err := http.NewRequest(method, url, nil)
if err != nil {
return nil, err
}
// Authentication token.
req.Header.Set("Authorization", fmt.Sprintf("PVEAPIToken=%s=%s", instance.tokenId, instance.secret))
q := req.URL.Query()
for key, element := range params {
q.Add(key, element)
}
req.URL.RawQuery = q.Encode()
req.Header.Set("Connection", "Keep-Alive")
req.Header.Set("User-Agent", "Proxmox exporter")
var maxAttempts int = 3
var attempts int = 0
var resp *http.Response
for attempts <= maxAttempts {
resp, err = instance.httpClient.Do(req)
// If request is sucessful, then break the loop.
if err == nil {
break
}
// In case the request failed it will be attempted again in another loop iteration after second + attempts made.
if err != nil && attempts < maxAttempts {
log.Warnf("Request '%s' failed (%s). Attempting again in %d seconds.", url, err.Error(), attempts+1)
time.Sleep(time.Duration(attempts+1) * time.Second)
continue
}
// If all attempts fail then return the error.
if err != nil && attempts == maxAttempts {
return nil, err
}
attempts++
}
defer resp.Body.Close()
// Decode JSON to ApiResponse struct.
result := ApiResponse{}
err = json.NewDecoder(resp.Body).Decode(&result)
if err != nil {
return nil, err
} else {
instance.cache.Set(method+path, result, time.Second*5)
return &result, nil
}
}
// Check if API client is stopped.
func (instance *ApiClient) IsStopped() bool {
instance.mutex.Lock()
defer instance.mutex.Unlock()
return instance.stopped
}
// Start the API client.
func (instance *ApiClient) Start() {
ticker := time.NewTicker(instance.checkInterval)
instance.checkEndpointsLiveness()
go func() {
for {
select {
case <-ticker.C:
instance.checkEndpointsLiveness()
case <-instance.stop:
instance.mutex.Lock()
defer instance.mutex.Unlock()
instance.stopped = true
return
}
}
}()
}
// Close connections and stop the API client.
func (instance *ApiClient) Stop() {
instance.httpClient.CloseIdleConnections()
close(instance.stop)
}