Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 16 additions & 26 deletions integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/prometheus/alertmanager/types"
promapi "github.com/prometheus/client_golang/api"
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
remoteapi "github.com/prometheus/client_golang/exp/api/remote"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/rulefmt"
Expand Down Expand Up @@ -51,6 +52,7 @@ type Client struct {
distributorAddress string
timeout time.Duration
httpClient *http.Client
remoteWriteAPI *remoteapi.API
querierClient promv1.API
orgID string
}
Expand All @@ -72,6 +74,17 @@ func NewClient(
return nil, err
}

client := &http.Client{
Transport: &addOrgIDRoundTripper{orgID: orgID, next: http.DefaultTransport},
}
remoteWriteAPI, err := remoteapi.NewAPI(fmt.Sprintf("http://%s", distributorAddress),
remoteapi.WithAPIHTTPClient(client),
remoteapi.WithAPIPath("/api/prom/push"),
)
if err != nil {
return nil, err
}

c := &Client{
distributorAddress: distributorAddress,
querierAddress: querierAddress,
Expand All @@ -80,6 +93,7 @@ func NewClient(
timeout: 30 * time.Second,
httpClient: &http.Client{},
querierClient: promv1.NewAPI(querierAPIClient),
remoteWriteAPI: remoteWriteAPI,
orgID: orgID,
}

Expand Down Expand Up @@ -184,36 +198,12 @@ func (c *Client) Push(timeseries []prompb.TimeSeries, metadata ...prompb.MetricM
}

// PushV2 the input timeseries to the remote endpoint
func (c *Client) PushV2(symbols []string, timeseries []writev2.TimeSeries) (*http.Response, error) {
func (c *Client) PushV2(symbols []string, timeseries []writev2.TimeSeries) (remoteapi.WriteResponseStats, error) {
// Create write request
data, err := proto.Marshal(&writev2.Request{Symbols: symbols, Timeseries: timeseries})
if err != nil {
return nil, err
}

// Create HTTP request
compressed := snappy.Encode(nil, data)
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/prom/push", c.distributorAddress), bytes.NewReader(compressed))
if err != nil {
return nil, err
}

req.Header.Add("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request")
req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0")
req.Header.Set("X-Scope-OrgID", c.orgID)

ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

// Execute HTTP request
res, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}

defer res.Body.Close()
return res, nil
return c.remoteWriteAPI.Write(ctx, remoteapi.WriteV2MessageType, &writev2.Request{Symbols: symbols, Timeseries: timeseries})
}

func getNameAndAttributes(ts prompb.TimeSeries) (string, map[string]any) {
Expand Down
56 changes: 24 additions & 32 deletions integration/remote_write_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ package integration

import (
"math/rand"
"net/http"
"path"
"testing"
"time"

remoteapi "github.com/prometheus/client_golang/exp/api/remote"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
Expand Down Expand Up @@ -95,10 +95,9 @@ func TestIngesterRollingUpdate(t *testing.T) {

// series push
symbols1, series, expectedVector := e2e.GenerateSeriesV2("test_series", now, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "foo", Value: "bar"})
res, err := c.PushV2(symbols1, series)
stats, err := c.PushV2(symbols1, series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
testPushHeader(t, res.Header, "1", "0", "0")
testPushHeader(t, stats, 1, 0, 0)

// sample
result, err := c.Query("test_series", now)
Expand All @@ -113,16 +112,14 @@ func TestIngesterRollingUpdate(t *testing.T) {
// histogram
histogramIdx := rand.Uint32()
symbols2, histogramSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"})
res, err = c.PushV2(symbols2, histogramSeries)
writeStats, err := c.PushV2(symbols2, histogramSeries)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
testPushHeader(t, res.Header, "0", "1", "0")
testPushHeader(t, writeStats, 0, 1, 0)

symbols3, histogramFloatSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, true, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"})
res, err = c.PushV2(symbols3, histogramFloatSeries)
writeStats, err = c.PushV2(symbols3, histogramFloatSeries)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
testPushHeader(t, res.Header, "0", "1", "0")
testPushHeader(t, writeStats, 0, 1, 0)

testHistogramTimestamp := now.Add(blockRangePeriod * 2)
expectedHistogram := tsdbutil.GenerateTestHistogram(int64(histogramIdx))
Expand Down Expand Up @@ -198,9 +195,9 @@ func TestIngest_SenderSendPRW2_DistributorNotAllowPRW2(t *testing.T) {

// series push
symbols1, series, _ := e2e.GenerateSeriesV2("test_series", now, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "foo", Value: "bar"})
res, err := c.PushV2(symbols1, series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
_, err = c.PushV2(symbols1, series)
require.Error(t, err)
require.Contains(t, err.Error(), "sent v2 request; got 2xx, but PRW 2.0 response header statistics indicate 0 samples, 0 histograms and 0 exemplars were accepted")

// sample
result, err := c.Query("test_series", now)
Expand Down Expand Up @@ -266,10 +263,9 @@ func TestIngest(t *testing.T) {

// series push
symbols1, series, expectedVector := e2e.GenerateSeriesV2("test_series", now, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "foo", Value: "bar"})
res, err := c.PushV2(symbols1, series)
writeStats, err := c.PushV2(symbols1, series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
testPushHeader(t, res.Header, "1", "0", "0")
testPushHeader(t, writeStats, 1, 0, 0)

// sample
result, err := c.Query("test_series", now)
Expand All @@ -284,17 +280,15 @@ func TestIngest(t *testing.T) {
// histogram
histogramIdx := rand.Uint32()
symbols2, histogramSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"})
res, err = c.PushV2(symbols2, histogramSeries)
writeStats, err = c.PushV2(symbols2, histogramSeries)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
testPushHeader(t, res.Header, "0", "1", "0")
testPushHeader(t, writeStats, 0, 1, 0)

// float histogram
symbols3, histogramFloatSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, true, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"})
res, err = c.PushV2(symbols3, histogramFloatSeries)
writeStats, err = c.PushV2(symbols3, histogramFloatSeries)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
testPushHeader(t, res.Header, "0", "1", "0")
testPushHeader(t, writeStats, 0, 1, 0)

testHistogramTimestamp := now.Add(blockRangePeriod * 2)
expectedHistogram := tsdbutil.GenerateTestHistogram(int64(histogramIdx))
Expand Down Expand Up @@ -379,10 +373,9 @@ func TestExemplar(t *testing.T) {
},
}

res, err := c.PushV2(symbols, timeseries)
writeStats, err := c.PushV2(symbols, timeseries)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
testPushHeader(t, res.Header, "1", "0", "1")
testPushHeader(t, writeStats, 1, 0, 1)

start := time.Now().Add(-time.Minute)
end := now.Add(time.Minute)
Expand Down Expand Up @@ -451,14 +444,13 @@ func Test_WriteStatWithReplication(t *testing.T) {
numSamples := 20
scrapeInterval := 30 * time.Second
symbols, series := e2e.GenerateV2SeriesWithSamples("test_series", start, scrapeInterval, 0, numSamples, prompb.Label{Name: "job", Value: "test"})
res, err := c.PushV2(symbols, []writev2.TimeSeries{series})
writeStats, err := c.PushV2(symbols, []writev2.TimeSeries{series})
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
testPushHeader(t, res.Header, "20", "0", "0")
testPushHeader(t, writeStats, 20, 0, 0)
}

func testPushHeader(t *testing.T, header http.Header, expectedSamples, expectedHistogram, expectedExemplars string) {
require.Equal(t, expectedSamples, header.Get("X-Prometheus-Remote-Write-Samples-Written"))
require.Equal(t, expectedHistogram, header.Get("X-Prometheus-Remote-Write-Histograms-Written"))
require.Equal(t, expectedExemplars, header.Get("X-Prometheus-Remote-Write-Exemplars-Written"))
func testPushHeader(t *testing.T, stats remoteapi.WriteResponseStats, expectedSamples, expectedHistogram, expectedExemplars int) {
require.Equal(t, expectedSamples, stats.Samples)
require.Equal(t, expectedHistogram, stats.Histograms)
require.Equal(t, expectedExemplars, stats.Exemplars)
}
Loading