package plug import ( "context" "git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database" "git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models" "google.golang.org/protobuf/types/known/timestamppb" "log" gRPC "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc" gonanoid "github.com/matoous/go-nanoid/v2" ) type server struct { gRPC.UnimplementedPlugConnectorServer ctx map[string]context.CancelFunc taskExecutionFunction TaskExecution sendMessageExecution SendMessageExecution getMessageExecution GetMessageExecution source models.Source } func NewGrpcServer(source models.Source, taskExecutionFunction TaskExecution, sendMessageExecution SendMessageExecution, getMessageExecution GetMessageExecution) gRPC.PlugConnectorServer { return &server{ ctx: make(map[string]context.CancelFunc), taskExecutionFunction: taskExecutionFunction, sendMessageExecution: sendMessageExecution, getMessageExecution: getMessageExecution, source: source, } } func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation) (*gRPC.PlugTaskStatus, error) { var plugTaskState gRPC.PlugTaskStatus id, err := gonanoid.New() if err != nil { return nil, err } plugTaskState.TaskId = id plugTaskState.TaskState = gRPC.PlugTaskState_RUNNING userSource, err := database.GetUserSourceByID(ctx, models.UserSourceID(creation.UserSourceId)) if err != nil { return nil, err } // gRPC closes the context after the call ended. So the whole scrapping stopped without waiting // by using this method we assign a new context to each new request we get. // This can be used for example to close the context with the given id ctx, cancel := context.WithCancel(context.Background()) s.ctx[id] = cancel go func() { // FIXME: better implement this methode, works for now but needs refactoring err := s.taskExecutionFunction(ctx, userSource, creation.DeepScrape, creation.ApiKey, func() { s.removeTask(id) }) if err != nil { log.Print(err) } }() return &plugTaskState, nil } func (s *server) TaskStatus(_ context.Context, task *gRPC.PlugTask) (*gRPC.PlugTaskStatus, error) { var plugTaskState gRPC.PlugTaskStatus _, found := s.ctx[task.TaskId] plugTaskState.TaskId = task.TaskId plugTaskState.TaskState = gRPC.PlugTaskState_RUNNING if !found { plugTaskState.TaskState = gRPC.PlugTaskState_UNKNOWN } return &plugTaskState, nil } func (s *server) TaskCancel(_ context.Context, task *gRPC.PlugTask) (*gRPC.PlugTaskStatus, error) { var plugTaskState gRPC.PlugTaskStatus plugTaskState.TaskState = gRPC.PlugTaskState_STOPPED plugTaskState.TaskId = task.TaskId s.removeTask(task.TaskId) return &plugTaskState, nil } func (s *server) removeTask(taskID string) { fn, exists := s.ctx[taskID] if !exists { return } fn() delete(s.ctx, taskID) } func (s *server) Ping(_ context.Context, request *gRPC.PingRequest) (*gRPC.PongResponse, error) { response := gRPC.PongResponse{ Message: request.Message, Timestamp: timestamppb.Now(), } return &response, nil } func (s *server) SendMessage(ctx context.Context, request *gRPC.SendMessageRequest) (*gRPC.SendMessageResponse, error) { messageResponse := gRPC.SendMessageResponse{Success: true} userSource, err := database.GetUserSourceByID(ctx, models.UserSourceID(request.UserSourceId)) if err != nil { return nil, err } err = s.sendMessageExecution(ctx, userSource, request.Message) if err != nil { return nil, err } return &messageResponse, nil } func (s *server) GetUserMessages(ctx context.Context, request *gRPC.GetMessagesRequest) (*gRPC.GetMessagesResponse, error) { messageResponse := gRPC.GetMessagesResponse{} userSource, err := database.GetUserSourceByID(ctx, models.UserSourceID(request.UserSourceId)) if err != nil { return nil, err } messages, err := s.getMessageExecution(ctx, userSource) if err != nil { return nil, err } for _, message := range messages { messageResponse.Messages = append(messageResponse.Messages, &gRPC.Message{ FromUserSourceId: request.UserSourceId, FromUserSourceName: request.UserSourceName, CreatedAt: message.CreatedAt, Body: message.Body, Title: message.Title, }) } return &messageResponse, nil }