258 lines
6.9 KiB
Go
258 lines
6.9 KiB
Go
package proxmox
|
|
|
|
import (
|
|
"crypto/tls"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/patrickmn/go-cache"
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
// Multipoint API HTTP client will make requests in round-robin fashion on all endpoints which are reachable.
|
|
// The client will periodically check endpoints for liveness.
|
|
type ApiClient struct {
|
|
endpoints []*ApiEndpoint // API endpoints array.
|
|
tokenId string // API token ID.
|
|
secret string // API token secret.
|
|
httpClient *http.Client // Internal HTTP client.
|
|
checkInterval time.Duration // Check interval how often will APi client look for endpoints liveness.
|
|
stop chan struct{} // Stop channel which is used in ticker.
|
|
mutex sync.Mutex // Stop mutex to prevent requests making requests on unitialized API client instance.
|
|
stopped bool // state of API client.
|
|
index int // Round robin next index.
|
|
aliveCount int // Count of alive endpoints.
|
|
cache *cache.Cache // Cache instance.
|
|
}
|
|
|
|
// PVE API endpoint.
|
|
type ApiEndpoint struct {
|
|
host string // Endpoint address.
|
|
alive bool // Endpoint liveness state.
|
|
}
|
|
|
|
// PVE API response.
|
|
type ApiResponse struct {
|
|
Data json.RawMessage `json:"data"` // Data message from PVE API.
|
|
}
|
|
|
|
// Create new instance of API HTTP client.
|
|
func NewApiClient(endpoints []string, tokenId string, secret string, checkInterval time.Duration) *ApiClient {
|
|
instance := ApiClient{
|
|
checkInterval: checkInterval,
|
|
tokenId: tokenId,
|
|
secret: secret,
|
|
}
|
|
|
|
// Cache initialization.
|
|
instance.cache = cache.New(30*time.Second, 5*time.Second)
|
|
|
|
// Prepare API endpoints.
|
|
for _, endpoint := range endpoints {
|
|
apiEndpoint := ApiEndpoint{
|
|
host: endpoint,
|
|
alive: false,
|
|
}
|
|
instance.endpoints = append(instance.endpoints, &apiEndpoint)
|
|
}
|
|
|
|
// Configure HTTP transport.
|
|
transport := &http.Transport{
|
|
Dial: (&net.Dialer{
|
|
Timeout: 30 * time.Second,
|
|
KeepAlive: 60 * time.Second,
|
|
DualStack: true,
|
|
}).Dial,
|
|
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
|
|
DisableCompression: false,
|
|
MaxIdleConns: 4,
|
|
MaxIdleConnsPerHost: 4,
|
|
IdleConnTimeout: 90 * time.Second,
|
|
TLSHandshakeTimeout: 10 * time.Second,
|
|
ExpectContinueTimeout: 1 * time.Second,
|
|
}
|
|
|
|
// Create instance of HTTP client with modified HTTP transport.
|
|
instance.httpClient = &http.Client{
|
|
Timeout: time.Second * 10,
|
|
Transport: transport,
|
|
}
|
|
|
|
return &instance
|
|
}
|
|
|
|
// Check endpoint liveness state.
|
|
func (instance *ApiClient) checkEndpointsLiveness() {
|
|
// We want to make sure other routines won't make any requests until we have checked for alive connections.
|
|
instance.mutex.Lock()
|
|
defer instance.mutex.Unlock()
|
|
|
|
prevAliveCount := instance.aliveCount
|
|
instance.aliveCount = 0
|
|
|
|
for _, endpoint := range instance.endpoints {
|
|
endpoint.alive = false
|
|
|
|
url := fmt.Sprintf("%s%s", endpoint.host, "api2/json/")
|
|
req, err := http.NewRequest("GET", url, nil)
|
|
if err != nil {
|
|
log.Errorf("Liveness check of host %s failed. Error: %s. ", endpoint.host, err)
|
|
continue
|
|
}
|
|
|
|
// Authentication token.
|
|
req.Header.Set("Authorization", fmt.Sprintf("PVEAPIToken=%s=%s", instance.tokenId, instance.secret))
|
|
|
|
resp, err := instance.httpClient.Do(req)
|
|
if err != nil {
|
|
log.Debugf("Liveness check of host %s failed. Error: %s. ", endpoint.host, err)
|
|
continue
|
|
}
|
|
|
|
if resp.StatusCode == 401 {
|
|
log.Errorf("Liveness check of host %s failed. Error: Authentication failed.", endpoint.host)
|
|
continue
|
|
}
|
|
|
|
if resp.StatusCode != 200 {
|
|
log.Errorf("Liveness check of host %s failed. Status code: %d. ", endpoint.host, resp.StatusCode)
|
|
continue
|
|
}
|
|
|
|
instance.aliveCount++
|
|
endpoint.alive = true
|
|
}
|
|
|
|
if prevAliveCount != instance.aliveCount {
|
|
log.Infof("Checked Proxmox API hosts. Status: (%d/%d) hosts are UP.", instance.aliveCount, len(instance.endpoints))
|
|
}
|
|
}
|
|
|
|
// Return one alive endpoint Round-Robin.
|
|
func (instance *ApiClient) getAliveEndpoint() (*ApiEndpoint, error) {
|
|
var alive []*ApiEndpoint
|
|
|
|
// Prepare only alive endpoints.
|
|
for _, endpoint := range instance.endpoints {
|
|
if endpoint.alive {
|
|
alive = append(alive, endpoint)
|
|
}
|
|
}
|
|
|
|
length := len(alive)
|
|
|
|
// If there are no alive endpoints then return error.
|
|
if length == 0 {
|
|
return nil, errors.New("All API endpoints are unreachable.")
|
|
}
|
|
|
|
next := alive[instance.index%length]
|
|
instance.index = (instance.index + 1) % length
|
|
return next, nil
|
|
}
|
|
|
|
// Check if API client is stopped.
|
|
func (instance *ApiClient) PerformRequest(method string, path string, params map[string]string) (*ApiResponse, error) {
|
|
instance.mutex.Lock()
|
|
defer instance.mutex.Unlock()
|
|
|
|
response, found := instance.cache.Get(method + path)
|
|
if found {
|
|
r := response.(ApiResponse)
|
|
return &r, nil
|
|
}
|
|
|
|
endpoint, err := instance.getAliveEndpoint()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
url := fmt.Sprintf("%sapi2/json%s", endpoint.host, path)
|
|
req, err := http.NewRequest(method, url, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Authentication token.
|
|
req.Header.Set("Authorization", fmt.Sprintf("PVEAPIToken=%s=%s", instance.tokenId, instance.secret))
|
|
|
|
q := req.URL.Query()
|
|
for key, element := range params {
|
|
q.Add(key, element)
|
|
}
|
|
req.URL.RawQuery = q.Encode()
|
|
req.Header.Set("Connection", "Keep-Alive")
|
|
req.Header.Set("User-Agent", "Proxmox exporter")
|
|
|
|
var maxAttempts int = 3
|
|
var attempts int = 0
|
|
var resp *http.Response
|
|
for attempts <= maxAttempts {
|
|
resp, err = instance.httpClient.Do(req)
|
|
// If request is sucessful, then break the loop.
|
|
if err == nil {
|
|
break
|
|
}
|
|
// In case the request failed it will be attempted again in another loop iteration after second + attempts made.
|
|
if err != nil && attempts < maxAttempts {
|
|
log.Warnf("Request '%s' failed (%s). Attempting again in %d seconds.", url, err.Error(), attempts+1)
|
|
time.Sleep(time.Duration(attempts+1) * time.Second)
|
|
continue
|
|
}
|
|
// If all attempts fail then return the error.
|
|
if err != nil && attempts == maxAttempts {
|
|
return nil, err
|
|
}
|
|
attempts++
|
|
}
|
|
|
|
defer resp.Body.Close()
|
|
|
|
// Decode JSON to ApiResponse struct.
|
|
result := ApiResponse{}
|
|
err = json.NewDecoder(resp.Body).Decode(&result)
|
|
if err != nil {
|
|
return nil, err
|
|
} else {
|
|
instance.cache.Set(method+path, result, time.Second*5)
|
|
return &result, nil
|
|
}
|
|
}
|
|
|
|
// Check if API client is stopped.
|
|
func (instance *ApiClient) IsStopped() bool {
|
|
instance.mutex.Lock()
|
|
defer instance.mutex.Unlock()
|
|
return instance.stopped
|
|
}
|
|
|
|
// Start the API client.
|
|
func (instance *ApiClient) Start() {
|
|
ticker := time.NewTicker(instance.checkInterval)
|
|
instance.checkEndpointsLiveness()
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
instance.checkEndpointsLiveness()
|
|
case <-instance.stop:
|
|
instance.mutex.Lock()
|
|
defer instance.mutex.Unlock()
|
|
instance.stopped = true
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Close connections and stop the API client.
|
|
func (instance *ApiClient) Stop() {
|
|
instance.httpClient.CloseIdleConnections()
|
|
close(instance.stop)
|
|
}
|