| package roadrunner
import (
	"github.com/stretchr/testify/assert"
	"log"
	"os/exec"
	"runtime"
	"strconv"
	"sync"
	"testing"
	"time"
)
var cfg = Config{
	NumWorkers:      uint64(runtime.NumCPU()),
	AllocateTimeout: time.Second,
	DestroyTimeout:  time.Second,
}
func Test_NewPool(t *testing.T) {
	p, err := NewPool(
		func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
		NewPipeFactory(),
		cfg,
	)
	assert.Equal(t, cfg, p.Config())
	defer p.Destroy()
	assert.NotNil(t, p)
	assert.NoError(t, err)
}
func Test_Pool_Invalid(t *testing.T) {
	p, err := NewPool(
		func() *exec.Cmd { return exec.Command("php", "tests/invalid.php") },
		NewPipeFactory(),
		cfg,
	)
	assert.Nil(t, p)
	assert.Error(t, err)
}
func Test_ConfigError(t *testing.T) {
	p, err := NewPool(
		func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
		NewPipeFactory(),
		Config{
			AllocateTimeout: time.Second,
			DestroyTimeout:  time.Second,
		},
	)
	assert.Nil(t, p)
	assert.Error(t, err)
}
func Test_Pool_Echo(t *testing.T) {
	p, err := NewPool(
		func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
		NewPipeFactory(),
		cfg,
	)
	defer p.Destroy()
	assert.NotNil(t, p)
	assert.NoError(t, err)
	res, err := p.Exec(&Payload{Body: []byte("hello")})
	assert.NoError(t, err)
	assert.NotNil(t, res)
	assert.NotNil(t, res.Body)
	assert.Nil(t, res.Context)
	assert.Equal(t, "hello", res.String())
}
func Test_Pool_Echo_NilContext(t *testing.T) {
	p, err := NewPool(
		func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
		NewPipeFactory(),
		cfg,
	)
	defer p.Destroy()
	assert.NotNil(t, p)
	assert.NoError(t, err)
	res, err := p.Exec(&Payload{Body: []byte("hello"), Context: nil})
	assert.NoError(t, err)
	assert.NotNil(t, res)
	assert.NotNil(t, res.Body)
	assert.Nil(t, res.Context)
	assert.Equal(t, "hello", res.String())
}
func Test_Pool_Echo_Context(t *testing.T) {
	p, err := NewPool(
		func() *exec.Cmd { return exec.Command("php", "tests/client.php", "head", "pipes") },
		NewPipeFactory(),
		cfg,
	)
	defer p.Destroy()
	assert.NotNil(t, p)
	assert.NoError(t, err)
	res, err := p.Exec(&Payload{Body: []byte("hello"), Context: []byte("world")})
	assert.NoError(t, err)
	assert.NotNil(t, res)
	assert.Nil(t, res.Body)
	assert.NotNil(t, res.Context)
	assert.Equal(t, "world", string(res.Context))
}
func Test_Pool_JobError(t *testing.T) {
	p, err := NewPool(
		func() *exec.Cmd { return exec.Command("php", "tests/client.php", "error", "pipes") },
		NewPipeFactory(),
		cfg,
	)
	defer p.Destroy()
	assert.NotNil(t, p)
	assert.NoError(t, err)
	res, err := p.Exec(&Payload{Body: []byte("hello")})
	assert.Error(t, err)
	assert.Nil(t, res)
	assert.IsType(t, JobError{}, err)
	assert.Equal(t, "hello", err.Error())
}
func Test_Pool_Broken_Replace(t *testing.T) {
	p, err := NewPool(
		func() *exec.Cmd { return exec.Command("php", "tests/client.php", "broken", "pipes") },
		NewPipeFactory(),
		cfg,
	)
	defer p.Destroy()
	assert.NotNil(t, p)
	assert.NoError(t, err)
	p.Observer = func(e int, w *Worker, ctx interface{}) {
		if err, ok := ctx.(error); ok {
			assert.Contains(t, err.Error(), "undefined_function()")
		}
	}
	res, err := p.Exec(&Payload{Body: []byte("hello")})
	assert.Error(t, err)
	assert.Nil(t, res)
}
func Test_Pool_AllocateTimeout(t *testing.T) {
	p, err := NewPool(
		func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") },
		NewPipeFactory(),
		Config{
			NumWorkers:      1,
			AllocateTimeout: time.Millisecond * 50,
			DestroyTimeout:  time.Second,
		},
	)
	assert.NotNil(t, p)
	assert.NoError(t, err)
	done := make(chan interface{})
	go func() {
		_, err := p.Exec(&Payload{Body: []byte("100")})
		assert.NoError(t, err)
		close(done)
	}()
	// to ensure that worker is already busy
	time.Sleep(time.Millisecond * 10)
	_, err = p.Exec(&Payload{Body: []byte("10")})
	assert.Error(t, err)
	assert.Contains(t, err.Error(), "worker timeout")
	<-done
	p.Destroy()
}
func Test_Pool_Replace_Worker(t *testing.T) {
	p, err := NewPool(
		func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") },
		NewPipeFactory(),
		Config{
			NumWorkers:      1,
			MaxExecutions:   1,
			AllocateTimeout: time.Second,
			DestroyTimeout:  time.Second,
		},
	)
	defer p.Destroy()
	assert.NotNil(t, p)
	assert.NoError(t, err)
	var lastPID string
	lastPID = strconv.Itoa(*p.Workers()[0].Pid)
	res, err := p.Exec(&Payload{Body: []byte("hello")})
	assert.Equal(t, lastPID, string(res.Body))
	for i := 0; i < 10; i++ {
		res, err := p.Exec(&Payload{Body: []byte("hello")})
		assert.NoError(t, err)
		assert.NotNil(t, res)
		assert.NotNil(t, res.Body)
		assert.Nil(t, res.Context)
		assert.NotEqual(t, lastPID, string(res.Body))
		lastPID = string(res.Body)
	}
}
// identical to replace but controlled on worker side
func Test_Pool_Stop_Worker(t *testing.T) {
	p, err := NewPool(
		func() *exec.Cmd { return exec.Command("php", "tests/client.php", "stop", "pipes") },
		NewPipeFactory(),
		Config{
			NumWorkers:      1,
			AllocateTimeout: time.Second,
			DestroyTimeout:  time.Second,
		},
	)
	defer p.Destroy()
	assert.NotNil(t, p)
	assert.NoError(t, err)
	var lastPID string
	lastPID = strconv.Itoa(*p.Workers()[0].Pid)
	res, err := p.Exec(&Payload{Body: []byte("hello")})
	assert.Equal(t, lastPID, string(res.Body))
	for i := 0; i < 10; i++ {
		res, err := p.Exec(&Payload{Body: []byte("hello")})
		assert.NoError(t, err)
		assert.NotNil(t, res)
		assert.NotNil(t, res.Body)
		assert.Nil(t, res.Context)
		assert.NotEqual(t, lastPID, string(res.Body))
		lastPID = string(res.Body)
	}
}
func Benchmark_Pool_Allocate(b *testing.B) {
	p, _ := NewPool(
		func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
		NewPipeFactory(),
		cfg,
	)
	defer p.Destroy()
	for n := 0; n < b.N; n++ {
		w, err := p.allocateWorker()
		if err != nil {
			b.Fail()
			log.Println(err)
		}
		p.free <- w
	}
}
func Benchmark_Pool_Echo(b *testing.B) {
	p, _ := NewPool(
		func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
		NewPipeFactory(),
		cfg,
	)
	defer p.Destroy()
	for n := 0; n < b.N; n++ {
		if _, err := p.Exec(&Payload{Body: []byte("hello")}); err != nil {
			b.Fail()
		}
	}
}
func Benchmark_Pool_Echo_Batched(b *testing.B) {
	p, _ := NewPool(
		func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
		NewPipeFactory(),
		cfg,
	)
	defer p.Destroy()
	var wg sync.WaitGroup
	for i := 0; i < b.N; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			if _, err := p.Exec(&Payload{Body: []byte("hello")}); err != nil {
				b.Fail()
				log.Println(err)
			}
		}()
	}
	wg.Wait()
}
func Benchmark_Pool_Echo_Replaced(b *testing.B) {
	p, _ := NewPool(
		func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
		NewPipeFactory(),
		Config{
			NumWorkers:      1,
			MaxExecutions:   1,
			AllocateTimeout: time.Second,
			DestroyTimeout:  time.Second,
		},
	)
	defer p.Destroy()
	for n := 0; n < b.N; n++ {
		if _, err := p.Exec(&Payload{Body: []byte("hello")}); err != nil {
			b.Fail()
			log.Println(err)
		}
	}
}
 |