plug-sdk/pkg/plug/grpc.go
SoXX 76174e3247
All checks were successful
Gitea Build Check / Build (push) Successful in 51s
Gitea Build Check / Build (pull_request) Successful in 51s
feat(dependencies): update to latest version
updated OtterSpaceSDK to v4.0.0
2024-08-29 15:26:58 +02:00

158 lines
5.1 KiB
Go

package plug
import (
"context"
"errors"
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database"
"git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models"
gRPC "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc"
gonanoid "github.com/matoous/go-nanoid/v2"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
type server struct {
gRPC.UnimplementedPlugConnectorServer
ctx map[string]context.CancelFunc
taskExecutionFunction TaskExecution
sendMessageExecution SendMessageExecution
getMessageExecution GetMessageExecution
source models.Source
}
func NewGrpcServer(source models.Source, taskExecutionFunction TaskExecution, sendMessageExecution SendMessageExecution, getMessageExecution GetMessageExecution) gRPC.PlugConnectorServer {
return &server{
ctx: make(map[string]context.CancelFunc),
taskExecutionFunction: taskExecutionFunction,
sendMessageExecution: sendMessageExecution,
getMessageExecution: getMessageExecution,
source: source,
}
}
func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation) (*gRPC.PlugTaskStatus, error) {
ctx, span := tracer.Start(ctx, "TaskStart")
defer span.End()
var plugTaskState gRPC.PlugTaskStatus
id, err := gonanoid.New()
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
span.AddEvent("Generated task ID", trace.WithAttributes(attribute.String("task_id", id)))
plugTaskState.TaskId = id
plugTaskState.TaskState = gRPC.PlugTaskState_RUNNING
userSource, err := database.GetUserSourceByID(ctx, models.UserSourceID(creation.UserSourceId))
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
span.AddEvent("Retrieved user source", trace.WithAttributes(attribute.String("user_source_id", creation.UserSourceId)))
if !userSource.AccountValidate {
err = errors.New("user is not validated")
log.WithContext(ctx).WithError(err).WithField("task_id", id).Error("Task execution failed")
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
plugTaskState.TaskState = gRPC.PlugTaskState_STOPPED
return &plugTaskState, err
}
// gRPC closes the context after the call ended. So the whole scrapping stopped without waiting
// by using this method we assign a new context to each new request we get.
// This can be used for example to close the context with the given id
ctx = trace.ContextWithSpanContext(context.Background(), trace.NewSpanContext(trace.SpanContextConfig{TraceID: span.SpanContext().TraceID()}))
taskCtx, cancel := context.WithCancel(ctx)
s.ctx[id] = cancel
span.AddEvent("Created new context for task", trace.WithAttributes(attribute.String("task_id", id)))
log.WithContext(taskCtx).WithFields(log.Fields{
"task_id": id,
"user_source_id": creation.UserSourceId,
}).Info("Starting task")
go func() {
err := s.taskExecutionFunction(taskCtx, userSource, creation.DeepScrape, creation.ApiKey, func() {
s.removeTask(id)
})
if err != nil {
log.WithContext(taskCtx).WithError(err).WithField("task_id", id).Error("Task execution failed")
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
} else {
log.WithContext(taskCtx).WithField("task_id", id).Info("Task completed successfully")
span.AddEvent("Task completed successfully", trace.WithAttributes(attribute.String("task_id", id)))
}
}()
span.SetAttributes(attribute.String("task_id", id))
return &plugTaskState, nil
}
func (s *server) TaskStatus(ctx context.Context, task *gRPC.PlugTask) (*gRPC.PlugTaskStatus, error) {
ctx, span := tracer.Start(ctx, "TaskStatus")
defer span.End()
var plugTaskState gRPC.PlugTaskStatus
_, found := s.ctx[task.TaskId]
plugTaskState.TaskId = task.TaskId
plugTaskState.TaskState = gRPC.PlugTaskState_RUNNING
if !found {
plugTaskState.TaskState = gRPC.PlugTaskState_UNKNOWN
}
span.AddEvent("Determined task state", trace.WithAttributes(attribute.String("task_id", task.TaskId), attribute.String("task_state", plugTaskState.TaskState.String())))
log.WithContext(ctx).WithFields(log.Fields{
"task_id": task.TaskId,
"task_state": plugTaskState.TaskState,
}).Info("Task status requested")
span.SetAttributes(attribute.String("task_id", task.TaskId))
return &plugTaskState, nil
}
func (s *server) TaskCancel(ctx context.Context, task *gRPC.PlugTask) (*gRPC.PlugTaskStatus, error) {
ctx, span := tracer.Start(ctx, "TaskCancel")
defer span.End()
var plugTaskState gRPC.PlugTaskStatus
plugTaskState.TaskState = gRPC.PlugTaskState_STOPPED
plugTaskState.TaskId = task.TaskId
s.removeTask(task.TaskId)
span.AddEvent("Removed task", trace.WithAttributes(attribute.String("task_id", task.TaskId)))
log.WithContext(ctx).WithFields(log.Fields{
"task_id": task.TaskId,
"task_state": plugTaskState.TaskState,
}).Info("Task cancellation requested")
span.SetAttributes(attribute.String("task_id", task.TaskId))
return &plugTaskState, nil
}
func (s *server) removeTask(taskID string) {
fn, exists := s.ctx[taskID]
if !exists {
return
}
fn()
delete(s.ctx, taskID)
}