From 9de22d31761387043724f39323ea88c64849a514 Mon Sep 17 00:00:00 2001 From: SoXX Date: Tue, 15 Oct 2024 16:08:34 +0200 Subject: [PATCH] feat: Add ScrapeHistory tracking and refactor task execution return type - Add `ScrapeHistory` record creation and update during task execution - Modify `TaskExecution` function to return `TaskSummery` instead of using a cancel function - Reintroduce and move `removeTask` function for cleanup after task execution --- pkg/plug/grpc.go | 53 +++++++++++++++++++++++++++++++++++----------- pkg/plug/server.go | 7 +++++- 2 files changed, 47 insertions(+), 13 deletions(-) diff --git a/pkg/plug/grpc.go b/pkg/plug/grpc.go index 837d980..e53e1f9 100644 --- a/pkg/plug/grpc.go +++ b/pkg/plug/grpc.go @@ -3,6 +3,7 @@ package plug import ( "context" "errors" + "time" "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database" "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models" @@ -47,6 +48,21 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation) } span.AddEvent("Generated task ID", trace.WithAttributes(attribute.String("task_id", id))) + scrapeTaskHistory := models.ScrapeHistory{ + ScrapeTaskID: models.ScrapeTaskID(id), + UserSourceID: models.UserSourceID(creation.UserSourceId), + } + scrapeTaskHistory, err = database.CreateScrapeHistory(ctx, scrapeTaskHistory) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + span.AddEvent("Creates ScrapeTaskHistory", trace.WithAttributes( + attribute.String("user_source_id", creation.UserSourceId), + attribute.String("scrape_task_id", id), + )) + plugTaskState.TaskId = id plugTaskState.TaskState = gRPC.PlugTaskState_RUNNING @@ -84,9 +100,7 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation) }).Debug("Starting task") go func() { - err := s.taskExecutionFunction(taskCtx, userSource, creation.DeepScrape, creation.ApiKey, func() { - s.removeTask(id) - }) + taskSummery, err := s.taskExecutionFunction(taskCtx, userSource, creation.DeepScrape, creation.ApiKey) if err != nil { log.WithContext(taskCtx).WithError(err).WithField("task_id", id).Error("Task execution failed") span.RecordError(err) @@ -95,6 +109,21 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation) log.WithContext(taskCtx).WithField("task_id", id).Debug("Task completed successfully") span.AddEvent("Task completed successfully", trace.WithAttributes(attribute.String("task_id", id))) } + + err = database.UpdateScrapeHistory(taskCtx, models.ScrapeHistory{ + ScrapeTaskID: models.ScrapeTaskID(id), + UserSourceID: userSource.ID, + FinishedAt: time.Now(), + Error: err.Error(), + AddedPosts: taskSummery.AddedPosts, + DeletedPosts: taskSummery.DeletedPosts, + }) + if err != nil { + log.WithContext(taskCtx).WithError(err).WithField("task_id", id).Error("Task execution failed") + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + s.removeTask(id) }() span.SetAttributes(attribute.String("task_id", id)) @@ -147,15 +176,6 @@ func (s *server) TaskCancel(ctx context.Context, task *gRPC.PlugTask) (*gRPC.Plu return &plugTaskState, nil } -func (s *server) removeTask(taskID string) { - fn, exists := s.ctx[taskID] - if !exists { - return - } - fn() - delete(s.ctx, taskID) -} - func (s *server) GetUserMessages(ctx context.Context, message *gRPC.GetMessagesRequest) (*gRPC.GetMessagesResponse, error) { ctx, span := tracer.Start(ctx, "GetUserMessages") defer span.End() @@ -248,3 +268,12 @@ func (s *server) Ping(ctx context.Context, ping *gRPC.PingRequest) (*gRPC.PongRe return &pong, nil } + +func (s *server) removeTask(taskID string) { + fn, exists := s.ctx[taskID] + if !exists { + return + } + fn() + delete(s.ctx, taskID) +} diff --git a/pkg/plug/server.go b/pkg/plug/server.go index 5c7e575..5707e3e 100644 --- a/pkg/plug/server.go +++ b/pkg/plug/server.go @@ -23,7 +23,12 @@ type Message struct { CreatedAt *timestamppb.Timestamp } -type TaskExecution func(ctx context.Context, userSource models.UserSource, deepScrape bool, apiKey string, cancelFunction func()) error +type TaskSummery struct { + AddedPosts int + DeletedPosts int +} + +type TaskExecution func(ctx context.Context, userSource models.UserSource, deepScrape bool, apiKey string) (TaskSummery, error) type SendMessageExecution func(ctx context.Context, userSource models.UserSource, message string) error type GetMessageExecution func(ctx context.Context, userSource models.UserSource) ([]Message, error)