plug-sdk/pkg/plug/grpc.go

141 lines
4.0 KiB
Go

package plug
import (
"context"
"log"
"git.anthrove.art/anthrove/otter-space-sdk/v2/pkg/database"
"git.anthrove.art/anthrove/otter-space-sdk/v2/pkg/models"
"google.golang.org/protobuf/types/known/timestamppb"
gRPC "git.anthrove.art/anthrove/plug-sdk/v2/pkg/grpc"
gonanoid "github.com/matoous/go-nanoid/v2"
)
type server struct {
gRPC.UnimplementedPlugConnectorServer
ctx map[string]context.CancelFunc
database database.OtterSpace
taskExecutionFunction TaskExecution
sendMessageExecution SendMessageExecution
getMessageExecution GetMessageExecution
}
func NewGrpcServer(database database.OtterSpace, taskExecutionFunction TaskExecution, sendMessageExecution SendMessageExecution, getMessageExecution GetMessageExecution) gRPC.PlugConnectorServer {
return &server{
ctx: make(map[string]context.CancelFunc),
database: database,
taskExecutionFunction: taskExecutionFunction,
sendMessageExecution: sendMessageExecution,
getMessageExecution: getMessageExecution,
}
}
func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation) (*gRPC.PlugTaskStatus, error) {
var anthroveUser models.User
var plugTaskState gRPC.PlugTaskStatus
var err error
id, err := gonanoid.New()
if err != nil {
return nil, err
}
plugTaskState.TaskId = id
plugTaskState.TaskState = gRPC.PlugTaskState_RUNNING
anthroveUser.ID = models.AnthroveUserID(creation.UserId)
// gRPC closes the context after the call ended. So the whole scrapping stopped without waiting
// by using this method we assign a new context to each new request we get.
// This can be used for example to close the context with the given id
ctx, cancel := context.WithCancel(context.Background())
s.ctx[id] = cancel
go func() {
// FIXME: better implement this methode, works for now but needs refactoring
err := s.taskExecutionFunction(ctx, s.database, creation.UserSourceName, anthroveUser, creation.DeepScrape, creation.ApiKey, func() {
s.removeTask(id)
})
if err != nil {
log.Print(err)
}
}()
return &plugTaskState, nil
}
func (s *server) TaskStatus(_ context.Context, task *gRPC.PlugTask) (*gRPC.PlugTaskStatus, error) {
var plugTaskState gRPC.PlugTaskStatus
_, found := s.ctx[task.TaskId]
plugTaskState.TaskId = task.TaskId
plugTaskState.TaskState = gRPC.PlugTaskState_RUNNING
if !found {
plugTaskState.TaskState = gRPC.PlugTaskState_UNKNOWN
}
return &plugTaskState, nil
}
func (s *server) TaskCancel(_ context.Context, task *gRPC.PlugTask) (*gRPC.PlugTaskStatus, error) {
var plugTaskState gRPC.PlugTaskStatus
plugTaskState.TaskState = gRPC.PlugTaskState_STOPPED
plugTaskState.TaskId = task.TaskId
s.removeTask(task.TaskId)
return &plugTaskState, nil
}
func (s *server) removeTask(taskID string) {
fn, exists := s.ctx[taskID]
if !exists {
return
}
fn()
delete(s.ctx, taskID)
}
func (s *server) Ping(_ context.Context, request *gRPC.PingRequest) (*gRPC.PongResponse, error) {
response := gRPC.PongResponse{
Message: request.Message,
Timestamp: timestamppb.Now(),
}
return &response, nil
}
func (s *server) SendMessage(ctx context.Context, request *gRPC.SendMessageRequest) (*gRPC.SendMessageResponse, error) {
messageResponse := gRPC.SendMessageResponse{Success: true}
err := s.sendMessageExecution(ctx, request.UserSourceId, request.UserSourceName, request.Message)
if err != nil {
return nil, err
}
return &messageResponse, nil
}
func (s *server) GetUserMessages(ctx context.Context, request *gRPC.GetMessagesRequest) (*gRPC.GetMessagesResponse, error) {
messageResponse := gRPC.GetMessagesResponse{}
messages, err := s.getMessageExecution(ctx, request.UserSourceId, request.UserSourceName)
if err != nil {
return nil, err
}
for _, message := range messages {
messageResponse.Messages = append(messageResponse.Messages, &gRPC.Message{
FromUserSourceId: request.UserSourceId,
FromUserSourceName: request.UserSourceName,
CreatedAt: message.CreatedAt,
Body: message.Body,
Title: message.Title,
})
}
return &messageResponse, nil
}