summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorErik Winter <ik@erikwinter.nl>2021-05-15 11:46:03 +0200
committerErik Winter <ik@erikwinter.nl>2021-05-15 11:49:18 +0200
commit16ca03e746675e1933dd4f7d1e0b3ec587ae05f9 (patch)
tree4d560dc712e2ef2a40388f88ef6dce4340bcdabd
parentd8054059f7d7bc37ae867840b3f0216e6d68d610 (diff)
mutex for processes
-rw-r--r--cmd/daemon/service.go10
-rw-r--r--cmd/generate-recurring/main.go2
-rw-r--r--cmd/process-inbox/main.go2
-rw-r--r--internal/process/inbox.go15
-rw-r--r--internal/process/inbox_test.go23
-rw-r--r--internal/process/recur.go15
-rw-r--r--internal/process/recur_test.go13
7 files changed, 48 insertions, 32 deletions
diff --git a/cmd/daemon/service.go b/cmd/daemon/service.go
index a80045d..5cd968a 100644
--- a/cmd/daemon/service.go
+++ b/cmd/daemon/service.go
@@ -51,7 +51,7 @@ func main() {
func Run(inboxProc *process.Inbox, recurProc *process.Recur, logger log.Logger) {
logger = logger.WithField("func", "run")
- inboxTicker := time.NewTicker(30 * time.Second)
+ inboxTicker := time.NewTicker(10 * time.Second)
recurTicker := time.NewTicker(time.Hour)
oldToday := task.Today
@@ -64,7 +64,9 @@ func Run(inboxProc *process.Inbox, recurProc *process.Recur, logger log.Logger)
continue
}
- logger.WithField("count", result.Count).Info("finished processing inbox")
+ if result.Count > 0 {
+ logger.WithField("result", result).Info("finished processing inbox")
+ }
case <-recurTicker.C:
year, month, day := time.Now().Date()
newToday := task.NewDate(year, int(month), day)
@@ -80,7 +82,9 @@ func Run(inboxProc *process.Inbox, recurProc *process.Recur, logger log.Logger)
continue
}
- logger.WithField("count", result.Count).Info("finished generating recurring tasks")
+ if result.Count > 0 {
+ logger.WithField("result", result).Info("finished generating recurring tasks")
+ }
}
}
}
diff --git a/cmd/generate-recurring/main.go b/cmd/generate-recurring/main.go
index 58bfae9..7aaaf74 100644
--- a/cmd/generate-recurring/main.go
+++ b/cmd/generate-recurring/main.go
@@ -38,5 +38,5 @@ func main() {
os.Exit(1)
}
- logger.WithField("count", result.Count).Info("finished generating recurring tasks")
+ logger.WithField("result", result).Info("finished generating recurring tasks")
}
diff --git a/cmd/process-inbox/main.go b/cmd/process-inbox/main.go
index 71fa25d..7b00331 100644
--- a/cmd/process-inbox/main.go
+++ b/cmd/process-inbox/main.go
@@ -31,5 +31,5 @@ func main() {
logger.WithErr(err).Error("unable to process inbox")
os.Exit(1)
}
- logger.WithField("count", result.Count).Info("finished processing inbox")
+ logger.WithField("result", result).Info("finished processing inbox")
}
diff --git a/internal/process/inbox.go b/internal/process/inbox.go
index 39ca635..ff3418e 100644
--- a/internal/process/inbox.go
+++ b/internal/process/inbox.go
@@ -3,12 +3,16 @@ package process
import (
"errors"
"fmt"
+ "sync"
+ "time"
"git.ewintr.nl/gte/internal/task"
)
var (
ErrInboxProcess = errors.New("could not process inbox")
+
+ inboxLock sync.Mutex
)
type Inbox struct {
@@ -16,7 +20,8 @@ type Inbox struct {
}
type InboxResult struct {
- Count int
+ Duration string `json:"duration"`
+ Count int `json:"count"`
}
func NewInbox(repo *task.TaskRepo) *Inbox {
@@ -26,6 +31,11 @@ func NewInbox(repo *task.TaskRepo) *Inbox {
}
func (inbox *Inbox) Process() (*InboxResult, error) {
+ inboxLock.Lock()
+ defer inboxLock.Unlock()
+
+ start := time.Now()
+
tasks, err := inbox.taskRepo.FindAll(task.FOLDER_INBOX)
if err != nil {
return &InboxResult{}, fmt.Errorf("%w: %v", ErrInboxProcess, err)
@@ -47,6 +57,7 @@ func (inbox *Inbox) Process() (*InboxResult, error) {
}
return &InboxResult{
- Count: len(tasks),
+ Duration: time.Since(start).String(),
+ Count: len(tasks),
}, nil
}
diff --git a/internal/process/inbox_test.go b/internal/process/inbox_test.go
index 9507c55..8d85c69 100644
--- a/internal/process/inbox_test.go
+++ b/internal/process/inbox_test.go
@@ -11,17 +11,16 @@ import (
func TestInboxProcess(t *testing.T) {
for _, tc := range []struct {
- name string
- messages map[string][]*mstore.Message
- expResult *process.InboxResult
- expMsgs map[string][]*mstore.Message
+ name string
+ messages map[string][]*mstore.Message
+ expCount int
+ expMsgs map[string][]*mstore.Message
}{
{
name: "empty",
messages: map[string][]*mstore.Message{
task.FOLDER_INBOX: {},
},
- expResult: &process.InboxResult{},
expMsgs: map[string][]*mstore.Message{
task.FOLDER_INBOX: {},
},
@@ -47,9 +46,7 @@ func TestInboxProcess(t *testing.T) {
},
},
},
- expResult: &process.InboxResult{
- Count: 4,
- },
+ expCount: 4,
expMsgs: map[string][]*mstore.Message{
task.FOLDER_INBOX: {},
task.FOLDER_NEW: {{Subject: "to new"}},
@@ -70,9 +67,7 @@ func TestInboxProcess(t *testing.T) {
Body: "id: xxx-xxx\nversion: 3",
}},
},
- expResult: &process.InboxResult{
- Count: 1,
- },
+ expCount: 1,
expMsgs: map[string][]*mstore.Message{
task.FOLDER_INBOX: {},
task.FOLDER_UNPLANNED: {{Subject: "new version"}},
@@ -90,9 +85,7 @@ func TestInboxProcess(t *testing.T) {
Body: "id: xxx-xxx\nversion: 5",
}},
},
- expResult: &process.InboxResult{
- Count: 1,
- },
+ expCount: 1,
expMsgs: map[string][]*mstore.Message{
task.FOLDER_INBOX: {},
task.FOLDER_UNPLANNED: {{Subject: "not really old version"}},
@@ -118,7 +111,7 @@ func TestInboxProcess(t *testing.T) {
actResult, err := inboxProc.Process()
test.OK(t, err)
- test.Equals(t, tc.expResult, actResult)
+ test.Equals(t, tc.expCount, actResult.Count)
for folder, expMessages := range tc.expMsgs {
actMessages, err := mstorer.Messages(folder)
test.OK(t, err)
diff --git a/internal/process/recur.go b/internal/process/recur.go
index 7347b94..b253af5 100644
--- a/internal/process/recur.go
+++ b/internal/process/recur.go
@@ -3,12 +3,16 @@ package process
import (
"errors"
"fmt"
+ "sync"
+ "time"
"git.ewintr.nl/gte/internal/task"
)
var (
ErrRecurProcess = errors.New("could not generate tasks from recurrer")
+
+ recurLock sync.Mutex
)
type Recur struct {
@@ -18,7 +22,8 @@ type Recur struct {
}
type RecurResult struct {
- Count int
+ Duration string `json:"duration"`
+ Count int `json:"count"`
}
func NewRecur(repo *task.TaskRepo, disp *task.Dispatcher, daysAhead int) *Recur {
@@ -30,6 +35,11 @@ func NewRecur(repo *task.TaskRepo, disp *task.Dispatcher, daysAhead int) *Recur
}
func (recur *Recur) Process() (*RecurResult, error) {
+ recurLock.Lock()
+ defer recurLock.Unlock()
+
+ start := time.Now()
+
tasks, err := recur.taskRepo.FindAll(task.FOLDER_RECURRING)
if err != nil {
return &RecurResult{}, fmt.Errorf("%w: %v", ErrRecurProcess, err)
@@ -51,6 +61,7 @@ func (recur *Recur) Process() (*RecurResult, error) {
}
return &RecurResult{
- Count: count,
+ Duration: time.Since(start).String(),
+ Count: count,
}, nil
}
diff --git a/internal/process/recur_test.go b/internal/process/recur_test.go
index 511a3b1..6c5d82c 100644
--- a/internal/process/recur_test.go
+++ b/internal/process/recur_test.go
@@ -15,13 +15,12 @@ func TestRecurProcess(t *testing.T) {
for _, tc := range []struct {
name string
recurMsgs []*mstore.Message
- expResult *process.RecurResult
+ expCount int
expMsgs []*msend.Message
}{
{
- name: "empty",
- expResult: &process.RecurResult{},
- expMsgs: []*msend.Message{},
+ name: "empty",
+ expMsgs: []*msend.Message{},
},
{
name: "one of two recurring",
@@ -35,9 +34,7 @@ func TestRecurProcess(t *testing.T) {
Body: "recur: 2021-05-10, daily\nid: xxx-xxx\nversion: 1",
},
},
- expResult: &process.RecurResult{
- Count: 1,
- },
+ expCount: 1,
expMsgs: []*msend.Message{
{Subject: "2021-05-15 (saturday) - recurring"},
},
@@ -60,7 +57,7 @@ func TestRecurProcess(t *testing.T) {
recurProc := process.NewRecur(task.NewRepository(mstorer), task.NewDispatcher(msender), 1)
actResult, err := recurProc.Process()
test.OK(t, err)
- test.Equals(t, tc.expResult, actResult)
+ test.Equals(t, tc.expCount, actResult.Count)
for i, expMsg := range tc.expMsgs {
test.Equals(t, expMsg.Subject, msender.Messages[i].Subject)
}