Skip to content

Commit f928ee5

Browse files
authored
chore: parallel manifest rendering (#1628)
* Parallelize manifest exporting * Skip manifest export early if no resources to be exported This saves not much but a couple hundred microseconds for these empty environments. * Group manifest generators again but without using a lexical scope * Better environment attributes inside traces * Cleanup * Handle a canceled context properly * Increase channel size to reduce wait
1 parent 1109e36 commit f928ee5

File tree

3 files changed

+148
-64
lines changed

3 files changed

+148
-64
lines changed

internal/telemetry/attributes.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package telemetry
22

33
import (
4-
"fmt"
5-
64
"github.com/grafana/tanka/pkg/spec/v1alpha1"
75
"go.opentelemetry.io/otel/attribute"
86
)
@@ -21,6 +19,7 @@ func AttrNumEnvs(v int) attribute.KeyValue {
2119

2220
func AttrEnv(v *v1alpha1.Environment) []attribute.KeyValue {
2321
return []attribute.KeyValue{
24-
attribute.String("tanka.env.id", fmt.Sprintf("%s@%s", v.Metadata.Name, v.Spec.APIServer)),
22+
attribute.String("tanka.env.name", v.Metadata.Name),
23+
attribute.String("tanka.env.namespace", v.Spec.Namespace),
2524
}
2625
}

pkg/tanka/export.go

Lines changed: 145 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,14 @@ import (
1010
"os"
1111
"path/filepath"
1212
"strings"
13+
"sync"
1314
"text/template"
1415

1516
"github.com/Masterminds/sprig/v3"
1617
"github.com/pkg/errors"
1718
"github.com/rs/zerolog/log"
19+
"go.opentelemetry.io/otel/attribute"
20+
"golang.org/x/sync/errgroup"
1821
"k8s.io/apimachinery/pkg/labels"
1922

2023
"github.com/grafana/tanka/internal/telemetry"
@@ -69,9 +72,6 @@ func ExportEnvironments(ctx context.Context, envs []*v1alpha1.Environment, to st
6972

7073
span.SetAttributes(telemetry.AttrNumEnvs(len(envs)))
7174

72-
// Keep track of which file maps to which environment
73-
fileToEnv := map[string]string{}
74-
7575
// dir must be empty
7676
empty, err := dirEmpty(to)
7777
if err != nil {
@@ -93,79 +93,168 @@ func ExportEnvironments(ctx context.Context, envs []*v1alpha1.Environment, to st
9393
return fmt.Errorf("deleting previously exported manifests from deleted environments: %w", err)
9494
}
9595

96+
parallelism := opts.Parallelism
97+
98+
if parallelism <= 0 {
99+
parallelism = defaultParallelism
100+
}
101+
102+
if parallelism > len(envs) {
103+
parallelism = len(envs)
104+
}
105+
96106
// get all environments for paths
97107
loadedEnvs, err := parallelLoadEnvironments(ctx, envs, parallelOpts{
98108
Opts: opts.Opts,
99109
Selector: opts.Selector,
100-
Parallelism: opts.Parallelism,
110+
Parallelism: parallelism,
101111
})
102112
if err != nil {
103113
return err
104114
}
105115

106-
{
107-
ctx, span := tracer.Start(ctx, "generateManifests")
108-
defer span.End()
116+
fileToEnv, err := manifestEnvironments(ctx, loadedEnvs, parallelism, to, opts)
117+
if err != nil {
118+
return err
119+
}
109120

110-
// FINDING: Generating the export files takes some time. Perhaps we should parallelize this.
121+
return exportManifestFile(to, fileToEnv, nil)
122+
}
123+
124+
func manifestEnvironments(ctx context.Context, loadedEnvs []*v1alpha1.Environment, parallelism int, to string, opts *ExportEnvOpts) (map[string]string, error) {
125+
ctx, span := tracer.Start(ctx, "tanka.manifestEnvironments")
126+
defer span.End()
127+
span.SetAttributes(telemetry.AttrNumEnvs(len(loadedEnvs)))
128+
fileToEnv := map[string]string{}
129+
fileToEnvLock := sync.Mutex{}
130+
131+
// Similar to the parallel loader, we're going to split all that up
132+
// into multiple routines that handle manifest loading and writing
133+
grp, ctx := errgroup.WithContext(ctx)
134+
135+
// Create a work channel that holds enough for all workers * 2 so that a
136+
// worker doesn't have to wait unnecessarily:
137+
envsToManifest := make(chan *v1alpha1.Environment, parallelism*2)
138+
139+
for range parallelism {
140+
grp.Go(func() error {
141+
ctx, span := tracer.Start(ctx, "tanka.manifestsGenerateWorker")
142+
defer span.End()
143+
144+
for {
145+
select {
146+
case <-ctx.Done():
147+
telemetry.FailSpanWithError(span, ctx.Err())
148+
return ctx.Err()
149+
case work, ok := <-envsToManifest:
150+
if !ok {
151+
// Channel is empty and closed
152+
return nil
153+
}
154+
localFileToEnv, err := manifestSingleEnv(ctx, work, to, opts)
155+
if err != nil {
156+
telemetry.FailSpanWithError(span, err)
157+
return err
158+
}
159+
160+
fileToEnvLock.Lock()
161+
for name, namespace := range localFileToEnv {
162+
fileToEnv[name] = namespace
163+
}
164+
fileToEnvLock.Unlock()
165+
}
166+
}
167+
})
168+
}
169+
170+
grp.Go(func() error {
111171
for _, env := range loadedEnvs {
112-
// get the manifests
113-
loaded, err := LoadManifests(ctx, env, opts.Opts.Filters)
114-
if err != nil {
115-
return err
172+
select {
173+
case <-ctx.Done():
174+
close(envsToManifest)
175+
return ctx.Err()
176+
case envsToManifest <- env:
116177
}
178+
}
179+
close(envsToManifest)
180+
return nil
181+
})
117182

118-
env := loaded.Env
119-
res := loaded.Resources
183+
if err := grp.Wait(); err != nil {
184+
telemetry.FailSpanWithError(span, err)
185+
return nil, err
186+
}
120187

121-
// create raw manifest version of env for templating
122-
env.Data = nil
123-
raw, err := json.Marshal(env)
124-
if err != nil {
125-
return err
126-
}
127-
var menv manifest.Manifest
128-
if err := json.Unmarshal(raw, &menv); err != nil {
129-
return err
130-
}
188+
return fileToEnv, nil
189+
}
131190

132-
// create template
133-
manifestTemplate, err := createTemplate(opts.Format, menv)
134-
if err != nil {
135-
return fmt.Errorf("parsing format: %s", err)
136-
}
191+
func manifestSingleEnv(ctx context.Context, work *v1alpha1.Environment, to string, opts *ExportEnvOpts) (map[string]string, error) {
192+
ctx, span := tracer.Start(ctx, "tanka.manifestSingleEnv")
193+
defer span.End()
194+
span.SetAttributes(telemetry.AttrEnv(work)...)
137195

138-
// write each to a file
139-
for _, m := range res {
140-
// apply template
141-
name, err := applyTemplate(manifestTemplate, m)
142-
if err != nil {
143-
return fmt.Errorf("executing name template: %w", err)
144-
}
196+
fileToEnv := make(map[string]string)
145197

146-
// Create all subfolders in path
147-
relpath := name + "." + opts.Extension
148-
path := filepath.Join(to, relpath)
198+
loaded, err := LoadManifests(ctx, work, opts.Opts.Filters)
199+
if err != nil {
200+
return nil, err
201+
}
149202

150-
fileToEnv[relpath] = env.Metadata.Namespace
203+
env := loaded.Env
204+
res := loaded.Resources
151205

152-
// Abort if already exists
153-
if exists, err := fileExists(path); err != nil {
154-
return err
155-
} else if exists {
156-
return fmt.Errorf("file '%s' already exists. Aborting", path)
157-
}
206+
span.SetAttributes(attribute.Int("tanka.env.num_resources", len(res)))
158207

159-
// Write manifest
160-
data := m.String()
161-
if err := writeExportFile(path, []byte(data)); err != nil {
162-
return err
163-
}
164-
}
165-
}
208+
// If there is no thing to process then we can skip the rest.
209+
if len(res) == 0 {
210+
return fileToEnv, nil
166211
}
167212

168-
return exportManifestFile(to, fileToEnv, nil)
213+
// create raw manifest version of env for templating
214+
env.Data = nil
215+
raw, err := json.Marshal(env)
216+
if err != nil {
217+
return nil, err
218+
}
219+
var menv manifest.Manifest
220+
if err := json.Unmarshal(raw, &menv); err != nil {
221+
return nil, err
222+
}
223+
224+
// create template
225+
manifestTemplate, err := createTemplate(ctx, opts.Format, menv)
226+
if err != nil {
227+
return nil, fmt.Errorf("parsing format: %s", err)
228+
}
229+
230+
// write each to a file
231+
for _, m := range res {
232+
// apply template
233+
name, err := applyTemplate(ctx, manifestTemplate, m)
234+
if err != nil {
235+
return nil, fmt.Errorf("executing name template: %w", err)
236+
}
237+
238+
// Create all subfolders in path
239+
relpath := name + "." + opts.Extension
240+
path := filepath.Join(to, relpath)
241+
242+
fileToEnv[relpath] = env.Metadata.Namespace
243+
244+
// Abort if already exists
245+
if exists, err := fileExists(path); err != nil {
246+
return nil, err
247+
} else if exists {
248+
return nil, fmt.Errorf("file '%s' already exists. Aborting", path)
249+
}
250+
251+
// Write manifest
252+
data := m.String()
253+
if err := writeExportFile(path, []byte(data)); err != nil {
254+
return nil, err
255+
}
256+
}
257+
return fileToEnv, nil
169258
}
170259

171260
func fileExists(name string) (bool, error) {
@@ -284,7 +373,7 @@ func writeExportFile(path string, data []byte) error {
284373
return os.WriteFile(path, data, 0644)
285374
}
286375

287-
func createTemplate(format string, env manifest.Manifest) (*template.Template, error) {
376+
func createTemplate(_ context.Context, format string, env manifest.Manifest) (*template.Template, error) {
288377
// Replace all os.path separators in string with BelRune for creating subfolders
289378
replaceFormat := replaceTmplText(format, string(os.PathSeparator), BelRune)
290379

@@ -318,7 +407,7 @@ func replaceTmplText(s, oldString, newString string) string {
318407
return strings.Join(parts, "")
319408
}
320409

321-
func applyTemplate(template *template.Template, m manifest.Manifest) (path string, err error) {
410+
func applyTemplate(_ context.Context, template *template.Template, m manifest.Manifest) (path string, err error) {
322411
buf := bytes.Buffer{}
323412
if err := template.Execute(&buf, m); err != nil {
324413
return "", err

pkg/tanka/load.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,7 @@ func LoadEnvironment(ctx context.Context, path string, opts Opts) (*v1alpha1.Env
7676
return env, nil
7777
}
7878

79-
func LoadManifests(ctx context.Context, env *v1alpha1.Environment, filters process.Matchers) (*LoadResult, error) {
80-
_, span := tracer.Start(ctx, "tanka.LoadManifests")
81-
defer span.End()
82-
span.SetAttributes(telemetry.AttrEnv(env)...)
83-
79+
func LoadManifests(_ context.Context, env *v1alpha1.Environment, filters process.Matchers) (*LoadResult, error) {
8480
if err := checkVersion(env.Spec.ExpectVersions.Tanka); err != nil {
8581
return nil, err
8682
}

0 commit comments

Comments
 (0)