Skip to content

Commit

Permalink
Merge pull request #123 from traP-jp/polling-properly
Browse files Browse the repository at this point in the history
メッセージのポーリング取得を漏らさず行うようにした
  • Loading branch information
oribe1115 authored Dec 15, 2023
2 parents 2f38b45 + 15497d8 commit f03b974
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 15 deletions.
5 changes: 2 additions & 3 deletions server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ func main() {
slog.Info("Error setting up: %v", err)
}

go func() {
traqmessage.PollingMessages()
}()
messagePoller := traqmessage.NewMessagePoller()
go messagePoller.Run()

instance.Logger.Fatal(instance.Start(":8080"))
}
70 changes: 58 additions & 12 deletions server/traqmessage/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,82 @@ import (
"fmt"
"h23s_15/model"
"strings"
"sync"
"time"

"github.com/traPtitech/go-traq"
"golang.org/x/exp/slog"
)

type MessagePoller struct {
processer *messageProcesser
}

func NewMessagePoller() *MessagePoller {
return &MessagePoller{
processer: &messageProcesser{
queue: make(chan *[]traq.Message),
},
}
}

// go routineの中で呼ぶこと
func PollingMessages() {
func (m *MessagePoller) Run() {
go m.processer.run()

pollingInterval := time.Minute * 3

lastCheckpoint := time.Now()
ticker := time.Tick(pollingInterval)
var checkpointMutex sync.Mutex

ticker := time.Tick(pollingInterval)
for range ticker {
checkpointMutex.Lock()

now := time.Now()
messages, err := collectMessages(lastCheckpoint, now)
if err != nil {
slog.Error(fmt.Sprintf("Failled to polling messages: %v", err))
continue
var collectedMessageCount int64
for i := 0; ; i++ {
messages, err := collectMessages(lastCheckpoint, now, i)
if err != nil {
slog.Error(fmt.Sprintf("Failled to polling messages: %v", err))
continue
}

collectedMessageCount += messages.TotalHits

// 取得したメッセージを使っての処理の呼び出し
m.processer.enqueue(&messages.Hits)

if messages.TotalHits < 100 {
break
}
}

slog.Info(fmt.Sprintf("Collect %d messages", collectedMessageCount))

lastCheckpoint = now
checkpointMutex.Unlock()
}
}

slog.Info(fmt.Sprintf("Collect %d messages", len(messages.Hits)))
// TODO: 取得したメッセージを使っての処理の呼び出し
messageProcessor(messages.Hits)
// 通知メッセージの検索と通知処理のjobを処理する
type messageProcesser struct {
queue chan *[]traq.Message
}

// go routineの中で呼ぶ
func (m *messageProcesser) run() {
select {
case messages := <-m.queue:
m.process(*messages)
}
}

func messageProcessor(messages []traq.Message) {
func (m *messageProcesser) enqueue(messages *[]traq.Message) {
m.queue <- messages
}

func (m *messageProcesser) process(messages []traq.Message) {
messageList, err := ConvertMessageHits(messages)
if err != nil {
slog.Error(fmt.Sprintf("Failled to convert messages: %v", err))
Expand Down Expand Up @@ -87,7 +133,7 @@ func sendMessage(notifyTargetTraqUUID string, messageContent string) error {
return nil
}

func collectMessages(from time.Time, to time.Time) (*traq.MessageSearchResult, error) {
func collectMessages(from time.Time, to time.Time, offset int) (*traq.MessageSearchResult, error) {
if model.ACCESS_TOKEN == "" {
slog.Info("Skip collectMessage")
return &traq.MessageSearchResult{}, nil
Expand All @@ -98,7 +144,7 @@ func collectMessages(from time.Time, to time.Time) (*traq.MessageSearchResult, e

// 1度での取得上限は100まで それ以上はoffsetを使うこと
// https://github.com/traPtitech/traQ/blob/47ed2cf94b2209c8444533326dee2a588936d5e0/service/search/engine.go#L51
result, _, err := client.MessageApi.SearchMessages(auth).After(from).Before(to).Limit(100).Execute()
result, _, err := client.MessageApi.SearchMessages(auth).After(from).Before(to).Limit(100).Offset(int32(offset)).Execute()
if err != nil {
return nil, err
}
Expand Down

0 comments on commit f03b974

Please sign in to comment.