From f82e4f70ac79ce3a9def696cc8cebed7d3fec89e Mon Sep 17 00:00:00 2001 From: David Janowski Date: Mon, 26 Aug 2024 15:42:42 +0200 Subject: [PATCH] feat(telemetry): initial added first draft of tracing and logging, may need to be improved --- pkg/plug/grpc.go | 109 ++++++++++++++++++++++------------------------- 1 file changed, 51 insertions(+), 58 deletions(-) diff --git a/pkg/plug/grpc.go b/pkg/plug/grpc.go index 5bbc8ae..ad009f2 100644 --- a/pkg/plug/grpc.go +++ b/pkg/plug/grpc.go @@ -2,14 +2,15 @@ package plug import ( "context" - "log" "git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database" "git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models" - "google.golang.org/protobuf/types/known/timestamppb" - gRPC "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc" gonanoid "github.com/matoous/go-nanoid/v2" + log "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" ) type server struct { @@ -32,41 +33,64 @@ func NewGrpcServer(source models.Source, taskExecutionFunction TaskExecution, se } func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation) (*gRPC.PlugTaskStatus, error) { + ctx, span := tracer.Start(ctx, "TaskStart") + defer span.End() + var plugTaskState gRPC.PlugTaskStatus id, err := gonanoid.New() if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return nil, err } + span.AddEvent("Generated task ID", trace.WithAttributes(attribute.String("task_id", id))) plugTaskState.TaskId = id plugTaskState.TaskState = gRPC.PlugTaskState_RUNNING userSource, err := database.GetUserSourceByID(ctx, models.UserSourceID(creation.UserSourceId)) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return nil, err } + span.AddEvent("Retrieved user source", trace.WithAttributes(attribute.String("user_source_id", creation.UserSourceId))) // gRPC closes the context after the call ended. So the whole scrapping stopped without waiting // by using this method we assign a new context to each new request we get. // This can be used for example to close the context with the given id - ctx, cancel := context.WithCancel(context.Background()) + taskCtx, cancel := context.WithCancel(context.Background()) s.ctx[id] = cancel + span.AddEvent("Created new context for task", trace.WithAttributes(attribute.String("task_id", id))) + + log.WithContext(ctx).WithFields(log.Fields{ + "task_id": id, + "user_source_id": creation.UserSourceId, + }).Info("Starting task") go func() { - // FIXME: better implement this methode, works for now but needs refactoring - err := s.taskExecutionFunction(ctx, userSource, creation.DeepScrape, creation.ApiKey, func() { + err := s.taskExecutionFunction(taskCtx, userSource, creation.DeepScrape, creation.ApiKey, func() { s.removeTask(id) }) if err != nil { - log.Print(err) + log.WithContext(taskCtx).WithError(err).WithField("task_id", id).Error("Task execution failed") + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } else { + log.WithContext(taskCtx).WithField("task_id", id).Info("Task completed successfully") + span.AddEvent("Task completed successfully", trace.WithAttributes(attribute.String("task_id", id))) } }() + span.SetAttributes(attribute.String("task_id", id)) return &plugTaskState, nil } -func (s *server) TaskStatus(_ context.Context, task *gRPC.PlugTask) (*gRPC.PlugTaskStatus, error) { +func (s *server) TaskStatus(ctx context.Context, task *gRPC.PlugTask) (*gRPC.PlugTaskStatus, error) { + ctx, span := tracer.Start(ctx, "TaskStatus") + defer span.End() + var plugTaskState gRPC.PlugTaskStatus _, found := s.ctx[task.TaskId] @@ -77,17 +101,35 @@ func (s *server) TaskStatus(_ context.Context, task *gRPC.PlugTask) (*gRPC.PlugT if !found { plugTaskState.TaskState = gRPC.PlugTaskState_UNKNOWN } + span.AddEvent("Determined task state", trace.WithAttributes(attribute.String("task_id", task.TaskId), attribute.String("task_state", plugTaskState.TaskState.String()))) + + log.WithContext(ctx).WithFields(log.Fields{ + "task_id": task.TaskId, + "task_state": plugTaskState.TaskState, + }).Info("Task status requested") + + span.SetAttributes(attribute.String("task_id", task.TaskId)) return &plugTaskState, nil } -func (s *server) TaskCancel(_ context.Context, task *gRPC.PlugTask) (*gRPC.PlugTaskStatus, error) { +func (s *server) TaskCancel(ctx context.Context, task *gRPC.PlugTask) (*gRPC.PlugTaskStatus, error) { + ctx, span := tracer.Start(ctx, "TaskCancel") + defer span.End() + var plugTaskState gRPC.PlugTaskStatus plugTaskState.TaskState = gRPC.PlugTaskState_STOPPED plugTaskState.TaskId = task.TaskId s.removeTask(task.TaskId) + span.AddEvent("Removed task", trace.WithAttributes(attribute.String("task_id", task.TaskId))) + log.WithContext(ctx).WithFields(log.Fields{ + "task_id": task.TaskId, + "task_state": plugTaskState.TaskState, + }).Info("Task cancellation requested") + + span.SetAttributes(attribute.String("task_id", task.TaskId)) return &plugTaskState, nil } @@ -99,52 +141,3 @@ func (s *server) removeTask(taskID string) { fn() delete(s.ctx, taskID) } - -func (s *server) Ping(_ context.Context, request *gRPC.PingRequest) (*gRPC.PongResponse, error) { - response := gRPC.PongResponse{ - Message: request.Message, - Timestamp: timestamppb.Now(), - } - return &response, nil -} - -func (s *server) SendMessage(ctx context.Context, request *gRPC.SendMessageRequest) (*gRPC.SendMessageResponse, error) { - messageResponse := gRPC.SendMessageResponse{Success: true} - - userSource, err := database.GetUserSourceByID(ctx, models.UserSourceID(request.UserSourceId)) - if err != nil { - return nil, err - } - - err = s.sendMessageExecution(ctx, userSource, request.Message) - if err != nil { - return nil, err - } - - return &messageResponse, nil -} - -func (s *server) GetUserMessages(ctx context.Context, request *gRPC.GetMessagesRequest) (*gRPC.GetMessagesResponse, error) { - messageResponse := gRPC.GetMessagesResponse{} - - userSource, err := database.GetUserSourceByID(ctx, models.UserSourceID(request.UserSourceId)) - if err != nil { - return nil, err - } - - messages, err := s.getMessageExecution(ctx, userSource) - if err != nil { - return nil, err - } - - for _, message := range messages { - messageResponse.Messages = append(messageResponse.Messages, &gRPC.Message{ - FromUserSourceId: request.UserSourceId, - CreatedAt: message.CreatedAt, - Body: message.Body, - Title: message.Title, - }) - } - - return &messageResponse, nil -}