未验证 提交 c52f0402 编写于 作者: J Josh van Leeuwen 提交者: GitHub

Integration Tests: placement (#6340)

* Adds initial integration testing scaffolding
Signed-off-by: Njoshvanl <me@joshvanl.dev>

* linting
Signed-off-by: Njoshvanl <me@joshvanl.dev>

* Use command context to shutdown daprd on windows
Signed-off-by: Njoshvanl <me@joshvanl.dev>

* Use windows specific process interrupt for windows
Signed-off-by: Njoshvanl <me@joshvanl.dev>

* Expect error response when interrupting windows
Signed-off-by: Njoshvanl <me@joshvanl.dev>

* Fix passing flags to daprd binary, and re-enable apphealthz test
Signed-off-by: Njoshvanl <me@joshvanl.dev>

* Move integration tests to separate make target and run in new github
workflow step
Signed-off-by: Njoshvanl <me@joshvanl.dev>

* Run integration tests at the same time as unit. Build needs both
Signed-off-by: Njoshvanl <me@joshvanl.dev>

* Update docs for integration build tag. daprd logger writes to test
logger
Signed-off-by: Njoshvanl <me@joshvanl.dev>

* Move integration framework into packages
Signed-off-by: Njoshvanl <me@joshvanl.dev>

* make integration-test -> make test-integration
Signed-off-by: Njoshvanl <me@joshvanl.dev>

* Use taskkill to kill the process on windows
Signed-off-by: Njoshvanl <me@joshvanl.dev>

* Send 2 interrupt signals to daprd, regardless of OS
Signed-off-by: Njoshvanl <me@joshvanl.dev>

* Go mod tidy
Signed-off-by: Njoshvanl <me@joshvanl.dev>

* Remove /T option (kill subprocesses) from windows kill exec command
Signed-off-by: Njoshvanl <me@joshvanl.dev>

* Rever kill_windows to use CTRL_BREAK_EVENT strategy for killing
processes.
Signed-off-by: Njoshvanl <me@joshvanl.dev>

* Make iowriter a buffered writer
Signed-off-by: Njoshvanl <me@joshvanl.dev>

* Don't do testing errors inside go routines, and pass context to HTTP
requests
Signed-off-by: Njoshvanl <me@joshvanl.dev>

* Change build tag from `all_components` to `allcomponents`
Signed-off-by: Njoshvanl <me@joshvanl.dev>

* Use windows specific process interrupt for windows
Signed-off-by: Njoshvanl <me@joshvanl.dev>

* Make process a generic interface for running more binaries
Signed-off-by: Njoshvanl <me@joshvanl.dev>

* Integration tests: Adds placement binary and healthz placement test case
Signed-off-by: Njoshvanl <me@joshvanl.dev>

* Adds ports tests for placement binary
Signed-off-by: Njoshvanl <me@joshvanl.dev>

* Rename process/base -> process/exec
Signed-off-by: Njoshvanl <me@joshvanl.dev>

* Increase placement port dialer from 1ms to 100ms
Signed-off-by: Njoshvanl <me@joshvanl.dev>

* Linting
Signed-off-by: Njoshvanl <me@joshvanl.dev>

* Update metadata to new proc framework
Signed-off-by: Njoshvanl <me@joshvanl.dev>

* Remove concurrency to improve test reliability on slow machines in CI
Signed-off-by: Njoshvanl <me@joshvanl.dev>

---------
Signed-off-by: Njoshvanl <me@joshvanl.dev>
Co-authored-by: NArtur Souza <asouza.pro@gmail.com>
上级 ca08aa97
...@@ -11,8 +11,8 @@ always built from source within the test. ...@@ -11,8 +11,8 @@ always built from source within the test.
## Invoking the test ## Invoking the test
```go ```bash
go test -v -race -tags="integration" ./tests/integration go test -v -race -tags="integration" ./tests/integration`
``` ```
Rather than building from source, you can also set a custom daprd binary path Rather than building from source, you can also set a custom daprd binary path
......
...@@ -15,203 +15,47 @@ package framework ...@@ -15,203 +15,47 @@ package framework
import ( import (
"context" "context"
"errors"
"io"
"net"
"os"
"os/exec"
"runtime"
"strconv"
"strings"
"sync"
"testing" "testing"
"github.com/google/uuid" "github.com/dapr/dapr/tests/integration/framework/process"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/dapr/dapr/tests/integration/framework/freeport"
"github.com/dapr/dapr/tests/integration/framework/iowriter"
"github.com/dapr/dapr/tests/integration/framework/kill"
) )
// daprdOptions contains the options for running Daprd in integration tests. type options struct {
type daprdOptions struct { procs []process.Interface
stdout io.WriteCloser
stderr io.WriteCloser
binPath string
appID string
appPort int
grpcPort int
httpPort int
internalGRPCPort int
publicPort int
metricsPort int
profilePort int
runErrorFn func(error)
exitCode int
appHealthCheck bool
appHealthCheckPath string
appHealthProbeInterval int
appHealthProbeThreshold int
} }
// RunDaprdOption is a function that configures the DaprdOptions. // Option is a function that configures the Framework's options.
type RunDaprdOption func(*daprdOptions) type Option func(*options)
type Command struct {
lock sync.Mutex
cmd *exec.Cmd
cmdcancel context.CancelFunc type Framework struct {
runErrorFnFn func(error) procs []process.Interface
exitCode int
stdoutpipe io.WriteCloser
stderrpipe io.WriteCloser
AppID string
AppPort int
GRPCPort int
HTTPPort int
InternalGRPCPort int
PublicPort int
MetricsPort int
ProfilePort int
} }
func RunDaprd(t *testing.T, ctx context.Context, opts ...RunDaprdOption) *Command { func Run(t *testing.T, ctx context.Context, opts ...Option) *Framework {
t.Helper() t.Helper()
uid, err := uuid.NewUUID() o := options{}
require.NoError(t, err)
appListener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, appListener.Close())
})
go func() {
for {
conn, err := appListener.Accept()
if errors.Is(err, net.ErrClosed) {
return
}
conn.Close()
}
}()
defaultExitCode := 0
if runtime.GOOS == "windows" {
// Windows returns 1 when we kill the process.
defaultExitCode = 1
}
fp := freeport.New(t, 6)
options := daprdOptions{
stdout: iowriter.New(t),
stderr: iowriter.New(t),
binPath: os.Getenv("DAPR_INTEGRATION_DAPRD_PATH"),
appID: uid.String(),
appPort: appListener.Addr().(*net.TCPAddr).Port,
grpcPort: fp.Port(t, 0),
httpPort: fp.Port(t, 1),
internalGRPCPort: fp.Port(t, 2),
publicPort: fp.Port(t, 3),
metricsPort: fp.Port(t, 4),
profilePort: fp.Port(t, 5),
runErrorFn: func(err error) {
if runtime.GOOS == "windows" {
// Windows returns 1 when we kill the process.
assert.ErrorContains(t, err, "exit status 1")
} else {
assert.NoError(t, err, "expected daprd to run without error")
}
},
exitCode: defaultExitCode,
}
for _, opt := range opts { for _, opt := range opts {
opt(&options) opt(&o)
} }
args := []string{ t.Logf("starting %d processes", len(o.procs))
"--log-level=" + "debug",
"--app-id=" + options.appID,
"--app-port=" + strconv.Itoa(options.appPort),
"--dapr-grpc-port=" + strconv.Itoa(options.grpcPort),
"--dapr-http-port=" + strconv.Itoa(options.httpPort),
"--dapr-internal-grpc-port=" + strconv.Itoa(options.internalGRPCPort),
"--dapr-public-port=" + strconv.Itoa(options.publicPort),
"--metrics-port=" + strconv.Itoa(options.metricsPort),
"--profile-port=" + strconv.Itoa(options.profilePort),
"--enable-app-health-check=" + strconv.FormatBool(options.appHealthCheck),
"--app-health-probe-interval=" + strconv.Itoa(options.appHealthProbeInterval),
"--app-health-threshold=" + strconv.Itoa(options.appHealthProbeThreshold),
}
if options.appHealthCheckPath != "" {
args = append(args, "--app-health-check-path="+options.appHealthCheckPath)
}
t.Logf("Running daprd with args: %s %s", options.binPath, strings.Join(args, " ")) for _, proc := range o.procs {
ctx, cancel := context.WithCancel(ctx) proc.Run(t, ctx)
//nolint:gosec
cmd := exec.CommandContext(ctx, options.binPath, args...)
cmd.Stdout = options.stdout
cmd.Stderr = options.stderr
daprd := &Command{
cmdcancel: cancel,
cmd: cmd,
stdoutpipe: options.stdout,
stderrpipe: options.stderr,
AppID: options.appID,
AppPort: options.appPort,
GRPCPort: options.grpcPort,
HTTPPort: options.httpPort,
InternalGRPCPort: options.internalGRPCPort,
PublicPort: options.publicPort,
MetricsPort: options.metricsPort,
ProfilePort: options.profilePort,
runErrorFnFn: options.runErrorFn,
exitCode: options.exitCode,
} }
fp.Free(t) return &Framework{
require.NoError(t, cmd.Start()) procs: o.procs,
}
return daprd
}
func (c *Command) Cleanup(t *testing.T) {
t.Helper()
c.lock.Lock()
defer c.lock.Unlock()
assert.NoError(t, c.stderrpipe.Close())
assert.NoError(t, c.stdoutpipe.Close())
kill.Kill(t, c.cmd)
c.checkExit(t)
}
func (c *Command) PID(t *testing.T) int {
t.Helper()
c.lock.Lock()
defer c.lock.Unlock()
assert.NotNil(t, c.cmd.Process, "PID called but process is nil")
return c.cmd.Process.Pid
} }
func (c *Command) checkExit(t *testing.T) { func (f *Framework) Cleanup(t *testing.T) {
t.Helper() t.Helper()
t.Log("waiting for daprd process to exit") t.Logf("stopping %d processes", len(f.procs))
c.runErrorFnFn(c.cmd.Wait()) for _, proc := range f.procs {
assert.NotNil(t, c.cmd.ProcessState, "process state should not be nil") proc.Cleanup(t)
assert.Equalf(t, c.exitCode, c.cmd.ProcessState.ExitCode(), "expected exit code to be %d", c.exitCode) }
} }
...@@ -13,106 +13,10 @@ limitations under the License. ...@@ -13,106 +13,10 @@ limitations under the License.
package framework package framework
import "io" import "github.com/dapr/dapr/tests/integration/framework/process"
func WithBinPath(binPath string) RunDaprdOption { func WithProcesses(procs ...process.Interface) Option {
return func(o *daprdOptions) { return func(o *options) {
o.binPath = binPath o.procs = procs
}
}
func WithStdout(stdout io.WriteCloser) RunDaprdOption {
return func(o *daprdOptions) {
o.stdout = stdout
}
}
func WithStderr(stderr io.WriteCloser) RunDaprdOption {
return func(o *daprdOptions) {
o.stderr = stderr
}
}
func WithAppID(appID string) RunDaprdOption {
return func(o *daprdOptions) {
o.appID = appID
}
}
func WithAppPort(port int) RunDaprdOption {
return func(o *daprdOptions) {
o.appPort = port
}
}
func WithGRPCPort(port int) RunDaprdOption {
return func(o *daprdOptions) {
o.grpcPort = port
}
}
func WithHTTPPort(port int) RunDaprdOption {
return func(o *daprdOptions) {
o.httpPort = port
}
}
func WithInternalGRPCPort(port int) RunDaprdOption {
return func(o *daprdOptions) {
o.internalGRPCPort = port
}
}
func WithPublicPort(port int) RunDaprdOption {
return func(o *daprdOptions) {
o.publicPort = port
}
}
func WithMetricsPort(port int) RunDaprdOption {
return func(o *daprdOptions) {
o.metricsPort = port
}
}
func WithProfilePort(port int) RunDaprdOption {
return func(o *daprdOptions) {
o.profilePort = port
}
}
func WithRunError(ferr func(error)) RunDaprdOption {
return func(o *daprdOptions) {
o.runErrorFn = ferr
}
}
func WithExitCode(code int) RunDaprdOption {
return func(o *daprdOptions) {
o.exitCode = code
}
}
func WithAppHealthCheck(enabled bool) RunDaprdOption {
return func(o *daprdOptions) {
o.appHealthCheck = enabled
}
}
func WithAppHealthCheckPath(path string) RunDaprdOption {
return func(o *daprdOptions) {
o.appHealthCheckPath = path
}
}
func WithAppHealthProbeInterval(interval int) RunDaprdOption {
return func(o *daprdOptions) {
o.appHealthProbeInterval = interval
}
}
func WithAppHealthProbeThreshold(threshold int) RunDaprdOption {
return func(o *daprdOptions) {
o.appHealthProbeThreshold = threshold
} }
} }
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package daprd
import (
"context"
"errors"
"net"
"os"
"strconv"
"testing"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/dapr/dapr/tests/integration/framework/freeport"
"github.com/dapr/dapr/tests/integration/framework/process"
"github.com/dapr/dapr/tests/integration/framework/process/exec"
)
// options contains the options for running Daprd in integration tests.
type options struct {
execOpts []exec.Option
appID string
appPort int
grpcPort int
httpPort int
internalGRPCPort int
publicPort int
metricsPort int
profilePort int
appHealthCheck bool
appHealthCheckPath string
appHealthProbeInterval int
appHealthProbeThreshold int
}
// Option is a function that configures the dapr process.
type Option func(*options)
type Daprd struct {
exec process.Interface
freeport *freeport.FreePort
AppID string
AppPort int
GRPCPort int
HTTPPort int
InternalGRPCPort int
PublicPort int
MetricsPort int
ProfilePort int
}
func New(t *testing.T, fopts ...Option) *Daprd {
t.Helper()
uid, err := uuid.NewUUID()
require.NoError(t, err)
appListener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, appListener.Close())
})
go func() {
for {
conn, err := appListener.Accept()
if errors.Is(err, net.ErrClosed) {
return
}
conn.Close()
}
}()
fp := freeport.New(t, 6)
opts := options{
appID: uid.String(),
appPort: appListener.Addr().(*net.TCPAddr).Port,
grpcPort: fp.Port(t, 0),
httpPort: fp.Port(t, 1),
internalGRPCPort: fp.Port(t, 2),
publicPort: fp.Port(t, 3),
metricsPort: fp.Port(t, 4),
profilePort: fp.Port(t, 5),
}
for _, fopt := range fopts {
fopt(&opts)
}
args := []string{
"--log-level=" + "debug",
"--app-id=" + opts.appID,
"--app-port=" + strconv.Itoa(opts.appPort),
"--dapr-grpc-port=" + strconv.Itoa(opts.grpcPort),
"--dapr-http-port=" + strconv.Itoa(opts.httpPort),
"--dapr-internal-grpc-port=" + strconv.Itoa(opts.internalGRPCPort),
"--dapr-public-port=" + strconv.Itoa(opts.publicPort),
"--metrics-port=" + strconv.Itoa(opts.metricsPort),
"--profile-port=" + strconv.Itoa(opts.profilePort),
"--enable-app-health-check=" + strconv.FormatBool(opts.appHealthCheck),
"--app-health-probe-interval=" + strconv.Itoa(opts.appHealthProbeInterval),
"--app-health-threshold=" + strconv.Itoa(opts.appHealthProbeThreshold),
}
if opts.appHealthCheckPath != "" {
args = append(args, "--app-health-check-path="+opts.appHealthCheckPath)
}
return &Daprd{
exec: exec.New(t, os.Getenv("DAPR_INTEGRATION_DAPRD_PATH"), args, opts.execOpts...),
freeport: fp,
AppID: opts.appID,
AppPort: opts.appPort,
GRPCPort: opts.grpcPort,
HTTPPort: opts.httpPort,
InternalGRPCPort: opts.internalGRPCPort,
PublicPort: opts.publicPort,
MetricsPort: opts.metricsPort,
ProfilePort: opts.profilePort,
}
}
func (d *Daprd) Run(t *testing.T, ctx context.Context) {
d.freeport.Free(t)
d.exec.Run(t, ctx)
}
func (d *Daprd) Cleanup(t *testing.T) {
d.exec.Cleanup(t)
}
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package daprd
import "github.com/dapr/dapr/tests/integration/framework/process/exec"
func WithExecOptions(execOptions ...exec.Option) Option {
return func(o *options) {
o.execOpts = execOptions
}
}
func WithAppID(appID string) Option {
return func(o *options) {
o.appID = appID
}
}
func WithAppPort(port int) Option {
return func(o *options) {
o.appPort = port
}
}
func WithGRPCPort(port int) Option {
return func(o *options) {
o.grpcPort = port
}
}
func WithHTTPPort(port int) Option {
return func(o *options) {
o.httpPort = port
}
}
func WithInternalGRPCPort(port int) Option {
return func(o *options) {
o.internalGRPCPort = port
}
}
func WithPublicPort(port int) Option {
return func(o *options) {
o.publicPort = port
}
}
func WithMetricsPort(port int) Option {
return func(o *options) {
o.metricsPort = port
}
}
func WithProfilePort(port int) Option {
return func(o *options) {
o.profilePort = port
}
}
func WithAppHealthCheck(enabled bool) Option {
return func(o *options) {
o.appHealthCheck = enabled
}
}
func WithAppHealthCheckPath(path string) Option {
return func(o *options) {
o.appHealthCheckPath = path
}
}
func WithAppHealthProbeInterval(interval int) Option {
return func(o *options) {
o.appHealthProbeInterval = interval
}
}
func WithAppHealthProbeThreshold(threshold int) Option {
return func(o *options) {
o.appHealthProbeThreshold = threshold
}
}
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package exec
import (
"context"
"io"
oexec "os/exec"
"path/filepath"
"runtime"
"strings"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/dapr/dapr/tests/integration/framework/process/exec/iowriter"
"github.com/dapr/dapr/tests/integration/framework/process/exec/kill"
)
type options struct {
stdout io.WriteCloser
stderr io.WriteCloser
runErrorFn func(error)
exitCode int
}
type Option func(*options)
type exec struct {
lock sync.Mutex
cmd *oexec.Cmd
args []string
binPath string
runErrorFn func(error)
exitCode int
stdoutpipe io.WriteCloser
stderrpipe io.WriteCloser
}
func New(t *testing.T, binPath string, args []string, fopts ...Option) *exec {
t.Helper()
defaultExitCode := 0
if runtime.GOOS == "windows" {
// Windows returns 1 when we kill the process.
defaultExitCode = 1
}
opts := options{
stdout: iowriter.New(t, filepath.Base(binPath)),
stderr: iowriter.New(t, filepath.Base(binPath)),
runErrorFn: func(err error) {
t.Helper()
if runtime.GOOS == "windows" {
// Windows returns 1 when we kill the process.
assert.ErrorContains(t, err, "exit status 1")
} else {
assert.NoError(t, err, "expected %q to run without error", binPath)
}
},
exitCode: defaultExitCode,
}
for _, fopt := range fopts {
fopt(&opts)
}
return &exec{
binPath: binPath,
args: args,
stdoutpipe: opts.stdout,
stderrpipe: opts.stderr,
runErrorFn: opts.runErrorFn,
exitCode: opts.exitCode,
}
}
func (e *exec) Run(t *testing.T, ctx context.Context) {
t.Helper()
e.lock.Lock()
defer e.lock.Unlock()
t.Logf("Running %q with args: %s %s", filepath.Base(e.binPath), e.binPath, strings.Join(e.args, " "))
//nolint:gosec
e.cmd = oexec.CommandContext(ctx, e.binPath, e.args...)
e.cmd.Stdout = e.stdoutpipe
e.cmd.Stderr = e.stderrpipe
require.NoError(t, e.cmd.Start())
}
func (e *exec) Cleanup(t *testing.T) {
t.Helper()
e.lock.Lock()
defer e.lock.Unlock()
assert.NoError(t, e.stderrpipe.Close())
assert.NoError(t, e.stdoutpipe.Close())
kill.Kill(t, e.cmd)
e.checkExit(t)
}
func (e *exec) checkExit(t *testing.T) {
t.Helper()
t.Logf("waiting for %q process to exit", filepath.Base(e.binPath))
e.runErrorFn(e.cmd.Wait())
assert.NotNil(t, e.cmd.ProcessState, "process state should not be nil")
assert.Equalf(t, e.exitCode, e.cmd.ProcessState.ExitCode(), "expected exit code to be %d", e.exitCode)
}
...@@ -31,15 +31,16 @@ type Logger interface { ...@@ -31,15 +31,16 @@ type Logger interface {
// writes until a newline is encountered, at which point it flushes the buffer // writes until a newline is encountered, at which point it flushes the buffer
// to the test logger. // to the test logger.
type stdwriter struct { type stdwriter struct {
t Logger t Logger
buf bytes.Buffer procName string
lock sync.Mutex buf bytes.Buffer
lock sync.Mutex
} }
func New(t Logger) io.WriteCloser { func New(t Logger, procName string) io.WriteCloser {
return &stdwriter{ return &stdwriter{
t: t, t: t,
buf: bytes.Buffer{}, procName: procName,
} }
} }
...@@ -72,8 +73,7 @@ func (w *stdwriter) Close() error { ...@@ -72,8 +73,7 @@ func (w *stdwriter) Close() error {
// before calling. // before calling.
func (w *stdwriter) flush() { func (w *stdwriter) flush() {
defer w.buf.Reset() defer w.buf.Reset()
if b := w.buf.Bytes(); len(b) > 0 { if b := w.buf.Bytes(); len(b) > 0 {
w.t.Log(w.t.Name() + ": " + string(b)) w.t.Log(w.t.Name() + "/" + w.procName + ": " + string(b))
} }
} }
...@@ -36,7 +36,7 @@ func (m mockLogger) Name() string { ...@@ -36,7 +36,7 @@ func (m mockLogger) Name() string {
func TestNew(t *testing.T) { func TestNew(t *testing.T) {
t.Run("should return new stdwriter", func(t *testing.T) { t.Run("should return new stdwriter", func(t *testing.T) {
writer := New(new(mockLogger)) writer := New(new(mockLogger), "proc")
_, ok := writer.(*stdwriter) _, ok := writer.(*stdwriter)
assert.True(t, ok) assert.True(t, ok)
}) })
...@@ -45,7 +45,7 @@ func TestNew(t *testing.T) { ...@@ -45,7 +45,7 @@ func TestNew(t *testing.T) {
func TestWrite(t *testing.T) { func TestWrite(t *testing.T) {
t.Run("should write to buffer", func(t *testing.T) { t.Run("should write to buffer", func(t *testing.T) {
logger := new(mockLogger) logger := new(mockLogger)
writer := New(logger).(*stdwriter) writer := New(logger, "proc").(*stdwriter)
_, err := writer.Write([]byte("test")) _, err := writer.Write([]byte("test"))
require.NoError(t, err) require.NoError(t, err)
...@@ -54,18 +54,18 @@ func TestWrite(t *testing.T) { ...@@ -54,18 +54,18 @@ func TestWrite(t *testing.T) {
t.Run("should flush on newline", func(t *testing.T) { t.Run("should flush on newline", func(t *testing.T) {
logger := new(mockLogger) logger := new(mockLogger)
writer := New(logger).(*stdwriter) writer := New(logger, "proc").(*stdwriter)
_, err := writer.Write([]byte("test\n")) _, err := writer.Write([]byte("test\n"))
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, 0, writer.buf.Len()) assert.Equal(t, 0, writer.buf.Len())
_ = assert.Len(t, logger.msgs, 1) && assert.Equal(t, "TestLogger: test", logger.msgs[0]) _ = assert.Len(t, logger.msgs, 1) && assert.Equal(t, "TestLogger/proc: test", logger.msgs[0])
}) })
t.Run("should return error when closed", func(t *testing.T) { t.Run("should return error when closed", func(t *testing.T) {
writer := New(new(mockLogger)).(*stdwriter) writer := New(new(mockLogger), "proc").(*stdwriter)
writer.Close() writer.Close()
_, err := writer.Write([]byte("test\n")) _, err := writer.Write([]byte("test\n"))
...@@ -77,19 +77,19 @@ func TestWrite(t *testing.T) { ...@@ -77,19 +77,19 @@ func TestWrite(t *testing.T) {
func TestClose(t *testing.T) { func TestClose(t *testing.T) {
t.Run("should flush and close", func(t *testing.T) { t.Run("should flush and close", func(t *testing.T) {
logger := new(mockLogger) logger := new(mockLogger)
writer := New(logger).(*stdwriter) writer := New(logger, "proc").(*stdwriter)
writer.Write([]byte("test")) writer.Write([]byte("test"))
writer.Close() writer.Close()
assert.Equal(t, 0, writer.buf.Len()) assert.Equal(t, 0, writer.buf.Len())
_ = assert.Equal(t, 1, len(logger.msgs)) && assert.Equal(t, "TestLogger: test", logger.msgs[0]) _ = assert.Equal(t, 1, len(logger.msgs)) && assert.Equal(t, "TestLogger/proc: test", logger.msgs[0])
}) })
} }
func TestConcurrency(t *testing.T) { func TestConcurrency(t *testing.T) {
t.Run("should handle concurrent writes", func(t *testing.T) { t.Run("should handle concurrent writes", func(t *testing.T) {
logger := new(mockLogger) logger := new(mockLogger)
writer := New(logger).(*stdwriter) writer := New(logger, "proc").(*stdwriter)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(2) wg.Add(2)
...@@ -113,7 +113,7 @@ func TestConcurrency(t *testing.T) { ...@@ -113,7 +113,7 @@ func TestConcurrency(t *testing.T) {
assert.Len(t, logger.msgs, 2000) assert.Len(t, logger.msgs, 2000)
for _, msg := range logger.msgs { for _, msg := range logger.msgs {
assert.Contains(t, msg, "TestLogger: test ") assert.Contains(t, msg, "TestLogger/proc: test ")
} }
}) })
} }
...@@ -15,6 +15,7 @@ package kill ...@@ -15,6 +15,7 @@ package kill
import ( import (
"os/exec" "os/exec"
"path/filepath"
"testing" "testing"
"time" "time"
) )
...@@ -33,4 +34,11 @@ func Kill(t *testing.T, cmd *exec.Cmd) { ...@@ -33,4 +34,11 @@ func Kill(t *testing.T, cmd *exec.Cmd) {
interrupt(t, cmd) interrupt(t, cmd)
time.Sleep(time.Millisecond * 300) time.Sleep(time.Millisecond * 300)
interrupt(t, cmd) interrupt(t, cmd)
if filepath.Base(cmd.Path) == "daprd" {
// TODO: daprd does not currently gracefully exit on a single interrupt
// signal. Remove once fixed.
time.Sleep(time.Millisecond * 300)
interrupt(t, cmd)
}
} }
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package exec
import "io"
func WithStdout(stdout io.WriteCloser) Option {
return func(o *options) {
o.stdout = stdout
}
}
func WithStderr(stderr io.WriteCloser) Option {
return func(o *options) {
o.stderr = stderr
}
}
func WithRunError(ferr func(error)) Option {
return func(o *options) {
o.runErrorFn = ferr
}
}
func WithExitCode(code int) Option {
return func(o *options) {
o.exitCode = code
}
}
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package placement
import "github.com/dapr/dapr/tests/integration/framework/process/exec"
func WithExecOptions(execOptions ...exec.Option) Option {
return func(o *options) {
o.execOpts = execOptions
}
}
func WithPort(port int) Option {
return func(o *options) {
o.port = port
}
}
func WithID(id string) Option {
return func(o *options) {
o.id = id
}
}
func WithHealthzPort(port int) Option {
return func(o *options) {
o.healthzPort = port
}
}
func WithMetricsPort(port int) Option {
return func(o *options) {
o.metricsPort = port
}
}
func WithInitialCluster(initialCluster string) Option {
return func(o *options) {
o.initialCluster = initialCluster
}
}
func WithInitialClusterPorts(ports []int) Option {
return func(o *options) {
o.initialClusterPorts = ports
}
}
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package placement
import (
"context"
"os"
"strconv"
"testing"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"github.com/dapr/dapr/tests/integration/framework/freeport"
"github.com/dapr/dapr/tests/integration/framework/process"
"github.com/dapr/dapr/tests/integration/framework/process/exec"
)
// options contains the options for running Placement in integration tests.
type options struct {
execOpts []exec.Option
id string
port int
healthzPort int
metricsPort int
initialCluster string
initialClusterPorts []int
}
// Option is a function that configures the process.
type Option func(*options)
type Placement struct {
exec process.Interface
freeport *freeport.FreePort
ID string
Port int
HealthzPort int
MetricsPort int
InitialCluster string
InitialClusterPorts []int
}
func New(t *testing.T, fopts ...Option) *Placement {
t.Helper()
uid, err := uuid.NewUUID()
require.NoError(t, err)
fp := freeport.New(t, 4)
opts := options{
id: uid.String(),
port: fp.Port(t, 0),
healthzPort: fp.Port(t, 1),
metricsPort: fp.Port(t, 2),
initialCluster: uid.String() + "=localhost:" + strconv.Itoa(fp.Port(t, 3)),
initialClusterPorts: []int{fp.Port(t, 3)},
}
for _, fopt := range fopts {
fopt(&opts)
}
args := []string{
"--log-level=" + "debug",
"--id=" + opts.id,
"--port=" + strconv.Itoa(opts.port),
"--healthz-port=" + strconv.Itoa(opts.healthzPort),
"--metrics-port=" + strconv.Itoa(opts.metricsPort),
"--initial-cluster=" + opts.initialCluster,
}
return &Placement{
exec: exec.New(t, os.Getenv("DAPR_INTEGRATION_PLACEMENT_PATH"), args, opts.execOpts...),
freeport: fp,
ID: opts.id,
Port: opts.port,
HealthzPort: opts.healthzPort,
MetricsPort: opts.metricsPort,
InitialCluster: opts.initialCluster,
InitialClusterPorts: opts.initialClusterPorts,
}
}
func (p *Placement) Run(t *testing.T, ctx context.Context) {
p.freeport.Free(t)
p.exec.Run(t, ctx)
}
func (p *Placement) Cleanup(t *testing.T) {
p.exec.Cleanup(t)
}
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package process
import (
"context"
"testing"
)
// Interface is an interface for running and cleaning up a process.
type Interface interface {
Run(*testing.T, context.Context)
Cleanup(*testing.T)
}
...@@ -15,12 +15,14 @@ package integration ...@@ -15,12 +15,14 @@ package integration
import ( import (
"context" "context"
"fmt"
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"reflect" "reflect"
"runtime" "runtime"
"strconv" "strings"
"sync"
"testing" "testing"
"time" "time"
...@@ -33,73 +35,56 @@ import ( ...@@ -33,73 +35,56 @@ import (
_ "github.com/dapr/dapr/tests/integration/suite/ports" _ "github.com/dapr/dapr/tests/integration/suite/ports"
) )
const (
defaultConcurrency = 3
envConcurrency = "DAPR_INTEGRATION_CONCURRENCY"
envDaprdPath = "DAPR_INTEGRATION_DAPRD_PATH"
)
func RunIntegrationTests(t *testing.T) { func RunIntegrationTests(t *testing.T) {
// Parallelise the integration tests, but don't run more than `conc` (default buildBinaries(t)
// 3) at once.
conc := concurrency(t)
t.Logf("running integration tests with concurrency: %d", conc)
buildDaprd(t)
guard := make(chan struct{}, conc)
for _, tcase := range suite.All() { for _, tcase := range suite.All() {
tcase := tcase tcase := tcase
t.Run(reflect.TypeOf(tcase).Elem().Name(), func(t *testing.T) { tof := reflect.TypeOf(tcase).Elem()
t.Parallel() testName := filepath.Base(tof.PkgPath()) + "/" + tof.Name()
guard <- struct{}{} t.Run(testName, func(t *testing.T) {
t.Cleanup(func() { t.Logf("%s: setting up test case", testName)
<-guard options := tcase.Setup(t)
})
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel() defer cancel()
t.Log("setting up test case") t.Log("running framework")
options := tcase.Setup(t, ctx) f := framework.Run(t, ctx, options...)
t.Log("running daprd")
daprd := framework.RunDaprd(t, ctx, options...)
t.Log("running test case") t.Log("running test case")
tcase.Run(t, ctx, daprd) tcase.Run(t, ctx)
t.Log("cleaning up test case") t.Log("cleaning up framework")
daprd.Cleanup(t) f.Cleanup(t)
t.Log("done") t.Log("done")
}) })
} }
} }
func concurrency(t *testing.T) int { func buildBinaries(t *testing.T) {
conc := defaultConcurrency t.Helper()
concS, ok := os.LookupEnv(envConcurrency)
if ok {
var err error
conc, err = strconv.Atoi(concS)
if err != nil {
t.Fatalf("failed to parse %q: %s", envConcurrency, err)
}
if conc < 1 {
t.Fatalf("%q must be >= 1", envConcurrency)
}
}
return conc binaryNames := []string{"daprd", "placement"}
var wg sync.WaitGroup
wg.Add(len(binaryNames))
for _, name := range binaryNames {
go func(name string) {
defer wg.Done()
buildBinary(t, name)
}(name)
}
wg.Wait()
} }
func buildDaprd(t *testing.T) { func buildBinary(t *testing.T, name string) {
if _, ok := os.LookupEnv(envDaprdPath); !ok { t.Helper()
t.Logf("%q not set, building daprd binary", envDaprdPath) env := fmt.Sprintf("DAPR_INTEGRATION_%s_PATH", strings.ToUpper(name))
if _, ok := os.LookupEnv(env); !ok {
t.Logf("%q not set, building %s binary", env, name)
_, tfile, _, ok := runtime.Caller(0) _, tfile, _, ok := runtime.Caller(0)
require.True(t, ok) require.True(t, ok)
...@@ -107,22 +92,22 @@ func buildDaprd(t *testing.T) { ...@@ -107,22 +92,22 @@ func buildDaprd(t *testing.T) {
// Use a consistent temp dir for the binary so that the binary is cached on // Use a consistent temp dir for the binary so that the binary is cached on
// subsequent runs. // subsequent runs.
daprdPath := filepath.Join(os.TempDir(), "dapr_integration_tests/daprd") binPath := filepath.Join(os.TempDir(), "dapr_integration_tests/"+name)
if runtime.GOOS == "windows" { if runtime.GOOS == "windows" {
daprdPath += ".exe" binPath += ".exe"
} }
// Ensure CGO is disabled to avoid linking against system libraries. // Ensure CGO is disabled to avoid linking against system libraries.
t.Setenv("CGO_ENABLED", "0") os.Setenv("CGO_ENABLED", "0")
t.Logf("Root dir: %q", rootDir) t.Logf("Root dir: %q", rootDir)
t.Logf("Building daprd binary to: %q", daprdPath) t.Logf("Compiling %q binary to: %q", name, binPath)
cmd := exec.Command("go", "build", "-tags=allcomponents", "-v", "-o", daprdPath, filepath.Join(rootDir, "cmd/daprd")) cmd := exec.Command("go", "build", "-tags=allcomponents", "-v", "-o", binPath, filepath.Join(rootDir, "cmd/"+name))
cmd.Dir = rootDir cmd.Dir = rootDir
cmd.Stdout = os.Stdout cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr cmd.Stderr = os.Stderr
require.NoError(t, cmd.Run()) require.NoError(t, cmd.Run())
t.Setenv(envDaprdPath, daprdPath) require.NoError(t, os.Setenv(env, binPath))
} }
} }
...@@ -21,5 +21,6 @@ import ( ...@@ -21,5 +21,6 @@ import (
) )
func Test_Integration(t *testing.T) { func Test_Integration(t *testing.T) {
t.Parallel()
RunIntegrationTests(t) RunIntegrationTests(t)
} }
...@@ -28,23 +28,24 @@ import ( ...@@ -28,23 +28,24 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/dapr/dapr/tests/integration/framework" "github.com/dapr/dapr/tests/integration/framework"
procdaprd "github.com/dapr/dapr/tests/integration/framework/process/daprd"
"github.com/dapr/dapr/tests/integration/suite" "github.com/dapr/dapr/tests/integration/suite"
) )
func init() { func init() {
suite.Register(new(AppHealthz)) suite.Register(new(app))
} }
// AppHealthz tests that Dapr responds to healthz requests for the app. // app tests that Dapr responds to healthz requests for the app.
type AppHealthz struct { type app struct {
healthy atomic.Bool daprd *procdaprd.Daprd
server http.Server healthy atomic.Bool
done chan struct{} server http.Server
listener net.Listener
} }
func (a *AppHealthz) Setup(t *testing.T, _ context.Context) []framework.RunDaprdOption { func (a *app) Setup(t *testing.T) []framework.Option {
a.healthy.Store(true) a.healthy.Store(true)
a.done = make(chan struct{})
mux := http.NewServeMux() mux := http.NewServeMux()
mux.HandleFunc("/foo", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc("/foo", func(w http.ResponseWriter, r *http.Request) {
...@@ -61,36 +62,47 @@ func (a *AppHealthz) Setup(t *testing.T, _ context.Context) []framework.RunDaprd ...@@ -61,36 +62,47 @@ func (a *AppHealthz) Setup(t *testing.T, _ context.Context) []framework.RunDaprd
fmt.Fprintf(w, "%s %s", r.Method, r.URL.Path) fmt.Fprintf(w, "%s %s", r.Method, r.URL.Path)
}) })
listener, err := net.Listen("tcp", "127.0.0.1:0") var err error
a.listener, err = net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err) require.NoError(t, err)
a.server = http.Server{ a.server = http.Server{
Handler: mux, Handler: mux,
ReadHeaderTimeout: 5 * time.Second, ReadHeaderTimeout: 5 * time.Second,
} }
go func() { a.daprd = procdaprd.New(t,
defer close(a.done) procdaprd.WithAppHealthCheck(true),
require.ErrorIs(t, a.server.Serve(listener), http.ErrServerClosed) procdaprd.WithAppHealthCheckPath("/foo"),
}() procdaprd.WithAppPort(a.listener.Addr().(*net.TCPAddr).Port),
procdaprd.WithAppHealthProbeInterval(1),
procdaprd.WithAppHealthProbeThreshold(1),
)
return []framework.RunDaprdOption{ return []framework.Option{
framework.WithAppHealthCheck(true), framework.WithProcesses(a.daprd),
framework.WithAppHealthCheckPath("/foo"),
framework.WithAppPort(listener.Addr().(*net.TCPAddr).Port),
framework.WithAppHealthProbeInterval(1),
framework.WithAppHealthProbeThreshold(1),
} }
} }
func (a *AppHealthz) Run(t *testing.T, ctx context.Context, cmd *framework.Command) { func (a *app) Run(t *testing.T, ctx context.Context) {
done := make(chan struct{})
go func() {
defer close(done)
require.ErrorIs(t, a.server.Serve(a.listener), http.ErrServerClosed)
}()
assert.Eventually(t, func() bool { assert.Eventually(t, func() bool {
_, err := net.Dial("tcp", fmt.Sprintf("localhost:%d", cmd.InternalGRPCPort)) conn, err := net.Dial("tcp", fmt.Sprintf("localhost:%d", a.daprd.InternalGRPCPort))
return err == nil if err != nil {
return false
}
require.NoError(t, conn.Close())
return true
}, time.Second*5, 100*time.Millisecond) }, time.Second*5, 100*time.Millisecond)
a.healthy.Store(true) a.healthy.Store(true)
reqURL := fmt.Sprintf("http://localhost:%d/v1.0/invoke/%s/method/myfunc", cmd.HTTPPort, cmd.AppID) reqURL := fmt.Sprintf("http://localhost:%d/v1.0/invoke/%s/method/myfunc", a.daprd.HTTPPort, a.daprd.AppID)
assert.Eventually(t, func() bool { assert.Eventually(t, func() bool {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil) req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil)
...@@ -124,7 +136,7 @@ func (a *AppHealthz) Run(t *testing.T, ctx context.Context, cmd *framework.Comma ...@@ -124,7 +136,7 @@ func (a *AppHealthz) Run(t *testing.T, ctx context.Context, cmd *framework.Comma
require.NoError(t, a.server.Shutdown(ctx)) require.NoError(t, a.server.Shutdown(ctx))
select { select {
case <-a.done: case <-done:
case <-time.After(5 * time.Second): case <-time.After(5 * time.Second):
t.Error("timed out waiting for healthz server to close") t.Error("timed out waiting for healthz server to close")
} }
......
...@@ -25,23 +25,29 @@ import ( ...@@ -25,23 +25,29 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/dapr/dapr/tests/integration/framework" "github.com/dapr/dapr/tests/integration/framework"
procdaprd "github.com/dapr/dapr/tests/integration/framework/process/daprd"
"github.com/dapr/dapr/tests/integration/suite" "github.com/dapr/dapr/tests/integration/suite"
) )
func init() { func init() {
suite.Register(new(Healthz)) suite.Register(new(daprd))
} }
// Healthz tests that Dapr responds to healthz requests. // daprd tests that Dapr responds to healthz requests.
type Healthz struct{} type daprd struct {
proc *procdaprd.Daprd
}
func (h *Healthz) Setup(t *testing.T, _ context.Context) []framework.RunDaprdOption { func (d *daprd) Setup(t *testing.T) []framework.Option {
return nil d.proc = procdaprd.New(t)
return []framework.Option{
framework.WithProcesses(d.proc),
}
} }
func (h *Healthz) Run(t *testing.T, ctx context.Context, cmd *framework.Command) { func (d *daprd) Run(t *testing.T, ctx context.Context) {
assert.Eventually(t, func() bool { assert.Eventually(t, func() bool {
conn, err := net.Dial("tcp", fmt.Sprintf("localhost:%d", cmd.PublicPort)) conn, err := net.Dial("tcp", fmt.Sprintf("localhost:%d", d.proc.PublicPort))
if err != nil { if err != nil {
return false return false
} }
...@@ -49,7 +55,7 @@ func (h *Healthz) Run(t *testing.T, ctx context.Context, cmd *framework.Command) ...@@ -49,7 +55,7 @@ func (h *Healthz) Run(t *testing.T, ctx context.Context, cmd *framework.Command)
return true return true
}, time.Second*5, 100*time.Millisecond) }, time.Second*5, 100*time.Millisecond)
reqURL := fmt.Sprintf("http://localhost:%d/v1.0/healthz", cmd.PublicPort) reqURL := fmt.Sprintf("http://localhost:%d/v1.0/healthz", d.proc.PublicPort)
assert.Eventually(t, func() bool { assert.Eventually(t, func() bool {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil) req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil)
......
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package healthz
import (
"context"
"fmt"
"net"
"net/http"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/dapr/dapr/tests/integration/framework"
procplace "github.com/dapr/dapr/tests/integration/framework/process/placement"
"github.com/dapr/dapr/tests/integration/suite"
)
func init() {
suite.Register(new(placement))
}
// placement tests that Dapr responds to healthz requests.
type placement struct {
proc *procplace.Placement
}
func (d *placement) Setup(t *testing.T) []framework.Option {
d.proc = procplace.New(t)
return []framework.Option{
framework.WithProcesses(d.proc),
}
}
func (d *placement) Run(t *testing.T, ctx context.Context) {
assert.Eventuallyf(t, func() bool {
conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", d.proc.HealthzPort))
if err != nil {
return false
}
require.NoError(t, conn.Close())
return true
}, time.Second*5, 100*time.Millisecond, "healthz port %d not ready", d.proc.HealthzPort)
reqURL := fmt.Sprintf("http://127.0.0.1:%d/healthz", d.proc.HealthzPort)
assert.Eventually(t, func() bool {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil)
require.NoError(t, err)
resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
return http.StatusOK == resp.StatusCode
}, time.Second*10, 100*time.Millisecond)
}
...@@ -27,27 +27,37 @@ import ( ...@@ -27,27 +27,37 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/dapr/dapr/tests/integration/framework" "github.com/dapr/dapr/tests/integration/framework"
procdaprd "github.com/dapr/dapr/tests/integration/framework/process/daprd"
"github.com/dapr/dapr/tests/integration/suite" "github.com/dapr/dapr/tests/integration/suite"
) )
func init() { func init() {
suite.Register(new(Metadata)) suite.Register(new(metadata))
} }
// Metadata tests Dapr's response to metadata API requests. // metadata tests Dapr's response to metadata API requests.
type Metadata struct{} type metadata struct {
proc *procdaprd.Daprd
}
func (*Metadata) Setup(*testing.T, context.Context) []framework.RunDaprdOption { func (m *metadata) Setup(t *testing.T) []framework.Option {
return nil m.proc = procdaprd.New(t)
return []framework.Option{
framework.WithProcesses(m.proc),
}
} }
func (*Metadata) Run(t *testing.T, ctx context.Context, cmd *framework.Command) { func (m *metadata) Run(t *testing.T, ctx context.Context) {
assert.Eventually(t, func() bool { assert.Eventually(t, func() bool {
_, err := net.Dial("tcp", fmt.Sprintf("localhost:%d", cmd.InternalGRPCPort)) conn, err := net.Dial("tcp", fmt.Sprintf("localhost:%d", m.proc.InternalGRPCPort))
return err == nil if err != nil {
return false
}
require.NoError(t, conn.Close())
return true
}, time.Second*5, 100*time.Millisecond) }, time.Second*5, 100*time.Millisecond)
reqURL := fmt.Sprintf("http://localhost:%d/v1.0/metadata", cmd.PublicPort) reqURL := fmt.Sprintf("http://localhost:%d/v1.0/metadata", m.proc.PublicPort)
ctx, cancel := context.WithTimeout(ctx, time.Second*5) ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel() defer cancel()
...@@ -62,7 +72,7 @@ func (*Metadata) Run(t *testing.T, ctx context.Context, cmd *framework.Command) ...@@ -62,7 +72,7 @@ func (*Metadata) Run(t *testing.T, ctx context.Context, cmd *framework.Command)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, resp.Body.Close()) require.NoError(t, resp.Body.Close())
validateResponse(t, cmd.AppID, cmd.AppPort, string(resBody)) validateResponse(t, m.proc.AppID, m.proc.AppPort, string(resBody))
} }
// validateResponse asserts that the response body is valid JSON // validateResponse asserts that the response body is valid JSON
......
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ports
import (
"context"
"fmt"
"net"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/dapr/dapr/tests/integration/framework"
procdaprd "github.com/dapr/dapr/tests/integration/framework/process/daprd"
"github.com/dapr/dapr/tests/integration/suite"
)
func init() {
suite.Register(new(daprd))
}
// daprd tests that the ports are available when daprd is running.
type daprd struct {
proc *procdaprd.Daprd
}
func (d *daprd) Setup(t *testing.T) []framework.Option {
d.proc = procdaprd.New(t)
return []framework.Option{
framework.WithProcesses(d.proc),
}
}
func (d *daprd) Run(t *testing.T, _ context.Context) {
for name, port := range map[string]int{
"app": d.proc.AppPort,
"grpc": d.proc.GRPCPort,
"http": d.proc.HTTPPort,
"metrics": d.proc.MetricsPort,
"internal-grpc": d.proc.InternalGRPCPort,
"public": d.proc.PublicPort,
} {
assert.Eventuallyf(t, func() bool {
conn, err := net.Dial("tcp", fmt.Sprintf("localhost:%d", port))
if err != nil {
return false
}
require.NoError(t, conn.Close())
return true
}, time.Second*5, 100*time.Millisecond, "port %s (:%d) was not available in time", name, port)
}
}
...@@ -21,34 +21,43 @@ import ( ...@@ -21,34 +21,43 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/dapr/dapr/tests/integration/framework" "github.com/dapr/dapr/tests/integration/framework"
procplace "github.com/dapr/dapr/tests/integration/framework/process/placement"
"github.com/dapr/dapr/tests/integration/suite" "github.com/dapr/dapr/tests/integration/suite"
) )
func init() { func init() {
suite.Register(new(Ports)) suite.Register(new(placement))
} }
// Ports tests that the ports are available when the app is running. // placement tests that the ports are available when daprd is running.
type Ports struct{} type placement struct {
proc *procplace.Placement
}
func (p *Ports) Setup(t *testing.T, _ context.Context) []framework.RunDaprdOption { func (p *placement) Setup(t *testing.T) []framework.Option {
return nil p.proc = procplace.New(t)
return []framework.Option{
framework.WithProcesses(p.proc),
}
} }
func (p *Ports) Run(t *testing.T, _ context.Context, cmd *framework.Command) { func (p *placement) Run(t *testing.T, _ context.Context) {
for name, port := range map[string]int{ for name, port := range map[string]int{
"app": cmd.AppPort, "port": p.proc.Port,
"grpc": cmd.GRPCPort, "metrics": p.proc.MetricsPort,
"http": cmd.HTTPPort, "healthz": p.proc.HealthzPort,
"metrics": cmd.MetricsPort, "initialCluster": p.proc.InitialClusterPorts[0],
"internal-grpc": cmd.InternalGRPCPort,
"public": cmd.PublicPort,
} { } {
assert.Eventuallyf(t, func() bool { assert.Eventuallyf(t, func() bool {
_, err := net.Dial("tcp", fmt.Sprintf("localhost:%d", port)) conn, err := net.Dial("tcp", fmt.Sprintf("localhost:%d", port))
return err == nil if err != nil {
return false
}
require.NoError(t, conn.Close())
return true
}, time.Second*5, 100*time.Millisecond, "port %s (:%d) was not available in time", name, port) }, time.Second*5, 100*time.Millisecond, "port %s (:%d) was not available in time", name, port)
} }
} }
...@@ -24,8 +24,8 @@ var cases []Case ...@@ -24,8 +24,8 @@ var cases []Case
// Case is a test case for the integration test suite. // Case is a test case for the integration test suite.
type Case interface { type Case interface {
Setup(*testing.T, context.Context) []framework.RunDaprdOption Setup(*testing.T) []framework.Option
Run(*testing.T, context.Context, *framework.Command) Run(*testing.T, context.Context)
} }
// Register registers a test case. // Register registers a test case.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册