From 41882b9bfb50d086adb040c83b69421c8ff535e7 Mon Sep 17 00:00:00 2001 From: SoXX Date: Fri, 25 Oct 2024 22:33:13 +0200 Subject: [PATCH 01/19] feat: Refactor task execution to use Plug interface and update algorithm implementation --- pkg/otter/connect.go | 12 ---- pkg/plug/algorithm.go | 129 ++++++++++++++++++++++++++++++++++++++++++ pkg/plug/grpc.go | 29 ++++++---- pkg/plug/server.go | 13 ++--- 4 files changed, 152 insertions(+), 31 deletions(-) delete mode 100644 pkg/otter/connect.go create mode 100644 pkg/plug/algorithm.go diff --git a/pkg/otter/connect.go b/pkg/otter/connect.go deleted file mode 100644 index cfcf079..0000000 --- a/pkg/otter/connect.go +++ /dev/null @@ -1,12 +0,0 @@ -package otter - -import ( - "context" - - "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database" - "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models" -) - -func ConnectToDatabase(ctx context.Context, config models.DatabaseConfig) error { - return database.Connect(ctx, config) -} diff --git a/pkg/plug/algorithm.go b/pkg/plug/algorithm.go new file mode 100644 index 0000000..10b5bd4 --- /dev/null +++ b/pkg/plug/algorithm.go @@ -0,0 +1,129 @@ +package plug + +import ( + "context" + "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models" + log "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "gorm.io/gorm" +) + +type User struct { + userFavoriteCount int64 + userName string + userID string +} + +type Favorites struct { + posts []models.Post + nextPage string + lastPage string +} + +type Plug interface { + // GetFavoritePage + // The API Key can be an empty string if it's not supplied by the user, in this case the default API Key should be used + GetFavoritePage(ctx context.Context, apiKey string, userSource models.UserSource, pageIdentifier string) (Favorites, error) + + // GetUserProfile + // The API Key can be an empty string if it's not supplied by the user, in this case the default API Key should be used + GetUserProfile(ctx context.Context, apiKey string, userSource models.UserSource) (User, error) +} + +func Algorithm(ctx context.Context, plugInterface Plug, db *gorm.DB, userSource models.UserSource, deepScrape bool, apiKey string) (TaskSummery, error) { + ctx, span := tracer.Start(ctx, "mainScrapeAlgorithm") + defer span.End() + + span.SetAttributes( + attribute.String("user_source_id", string(userSource.ID)), + attribute.String("user_source_user_id", string(userSource.UserID)), + attribute.String("user_source_source_id", string(userSource.SourceID)), + ) + + basicLoggingInfo := log.Fields{ + "user_source_id": userSource.ID, + "user_source_user_id": userSource.UserID, + "user_source_source_id": userSource.SourceID, + } + + log.WithContext(ctx).WithFields(basicLoggingInfo).Info("Starting mainScrapeAlgorithm") + + taskSummery := TaskSummery{ + AddedPosts: 0, + DeletedPosts: 0, + } + + anthroveUserFavCount, err := getUserFavoriteCountFromDatabase(ctx, db, userSource.UserID, userSource.ID) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed to get user favorite count from db") + return taskSummery, err + } + + profile, err := plugInterface.GetUserProfile(ctx, apiKey, userSource) + if err != nil { + return taskSummery, err + } + + nextPage := "" + +outer: + for anthroveUserFavCount < profile.userFavoriteCount { + select { + case <-ctx.Done(): + break outer + default: + span.AddEvent("Executing getFavorites request") + + favorites, err := plugInterface.GetFavoritePage(ctx, apiKey, userSource, nextPage) + span.AddEvent("Finished executing getFavorites request") + + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed to execute favorites page") + return taskSummery, err + } + + if len(favorites.posts) <= 0 { + span.AddEvent("No more favorites found") + log.WithContext(ctx).WithFields(basicLoggingInfo).Info("No more favorites found") + break outer + } + + summery, err := BatchPostProcessingWithSummery(ctx, userSource, favorites.posts) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed in BatchPostProcessing") + return taskSummery, err + } + + if summery.AddedFavorites != int64(len(favorites.posts)) { + span.AddEvent("user has no more favorites to add") + break outer + } + + nextPage = favorites.nextPage + taskSummery.AddedPosts += int(summery.AddedFavorites) + } + } + + span.AddEvent("Completed scraping algorithm") + log.WithContext(ctx).WithFields(basicLoggingInfo).Info("Completed scraping algorithm") + return taskSummery, nil +} + +// getUserFavoriteCountFromDatabase +func getUserFavoriteCountFromDatabase(ctx context.Context, gorm *gorm.DB, userID models.UserID, userSourceID models.UserSourceID) (int64, error) { + var count int64 + + err := gorm.WithContext(ctx).Model(&models.UserFavorite{}).Where("user_id = ? AND user_source_id = ?", userID, userSourceID).Count(&count).Error + if err != nil { + return count, err + } + + return count, nil +} diff --git a/pkg/plug/grpc.go b/pkg/plug/grpc.go index e7ec2e6..379671a 100644 --- a/pkg/plug/grpc.go +++ b/pkg/plug/grpc.go @@ -17,20 +17,20 @@ import ( type server struct { gRPC.UnimplementedPlugConnectorServer - ctx map[string]context.CancelFunc - taskExecutionFunction TaskExecution - sendMessageExecution SendMessageExecution - getMessageExecution GetMessageExecution - source models.Source + ctx map[string]context.CancelFunc + plugInterface Plug + sendMessageExecution SendMessageExecution + getMessageExecution GetMessageExecution + source models.Source } -func NewGrpcServer(source models.Source, taskExecutionFunction TaskExecution, sendMessageExecution SendMessageExecution, getMessageExecution GetMessageExecution) gRPC.PlugConnectorServer { +func NewGrpcServer(source models.Source, plugAPIInterface Plug, sendMessageExecution SendMessageExecution, getMessageExecution GetMessageExecution) gRPC.PlugConnectorServer { return &server{ - ctx: make(map[string]context.CancelFunc), - taskExecutionFunction: taskExecutionFunction, - sendMessageExecution: sendMessageExecution, - getMessageExecution: getMessageExecution, - source: source, + ctx: make(map[string]context.CancelFunc), + plugInterface: plugAPIInterface, + sendMessageExecution: sendMessageExecution, + getMessageExecution: getMessageExecution, + source: source, } } @@ -116,7 +116,12 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation) go func() { var err error - taskSummery, err := s.taskExecutionFunction(taskCtx, userSource, creation.DeepScrape, creation.ApiKey) + gorm, err := database.GetGorm(taskCtx) + log.WithContext(taskCtx).WithError(err).WithField("task_id", id).Error("Failed to get Gorm client") + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + + taskSummery, err := Algorithm(taskCtx, s.plugInterface, gorm, userSource, creation.DeepScrape, creation.ApiKey) if err != nil { log.WithContext(taskCtx).WithError(err).WithField("task_id", id).Error("Task execution failed") span.RecordError(err) diff --git a/pkg/plug/server.go b/pkg/plug/server.go index 24d3b80..6e329c6 100644 --- a/pkg/plug/server.go +++ b/pkg/plug/server.go @@ -28,14 +28,13 @@ type TaskSummery struct { DeletedPosts int } -type TaskExecution func(ctx context.Context, userSource models.UserSource, deepScrape bool, apiKey string) (TaskSummery, error) type SendMessageExecution func(ctx context.Context, userSource models.UserSource, message string) error type GetMessageExecution func(ctx context.Context, userSource models.UserSource) ([]Message, error) var ( - taskExecutionFunction TaskExecution - sendMessageExecution SendMessageExecution - getMessageExecution GetMessageExecution + sendMessageExecution SendMessageExecution + getMessageExecution GetMessageExecution + plugAPIInterface Plug ) func Listen(ctx context.Context, listenAddr string, source models.Source) error { @@ -78,7 +77,7 @@ func Listen(ctx context.Context, listenAddr string, source models.Source) error grpc.StatsHandler(otelgrpc.NewServerHandler()), ) - pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(source, taskExecutionFunction, sendMessageExecution, getMessageExecution)) + pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(source, plugAPIInterface, sendMessageExecution, getMessageExecution)) go func() { err = grpcServer.Serve(lis) @@ -98,8 +97,8 @@ func Listen(ctx context.Context, listenAddr string, source models.Source) error return nil } -func SetTaskExecutionFunction(function TaskExecution) { - taskExecutionFunction = function +func SetTaskExecutionFunction(plugInterface Plug) { + plugAPIInterface = plugInterface } func SetSendMessageExecutionFunction(function SendMessageExecution) { -- 2.45.2 From 107a3170954650a42253d5384e9fc8f3f579f45f Mon Sep 17 00:00:00 2001 From: SoXX Date: Fri, 25 Oct 2024 22:46:22 +0200 Subject: [PATCH 02/19] fix: Capitalize struct fields for public access --- pkg/plug/algorithm.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/pkg/plug/algorithm.go b/pkg/plug/algorithm.go index 10b5bd4..1950c54 100644 --- a/pkg/plug/algorithm.go +++ b/pkg/plug/algorithm.go @@ -10,15 +10,15 @@ import ( ) type User struct { - userFavoriteCount int64 - userName string - userID string + UserFavoriteCount int64 + UserName string + UserID string } type Favorites struct { - posts []models.Post - nextPage string - lastPage string + Posts []models.Post + NextPage string + LastPage string } type Plug interface { @@ -70,7 +70,7 @@ func Algorithm(ctx context.Context, plugInterface Plug, db *gorm.DB, userSource nextPage := "" outer: - for anthroveUserFavCount < profile.userFavoriteCount { + for anthroveUserFavCount < profile.UserFavoriteCount { select { case <-ctx.Done(): break outer @@ -87,13 +87,13 @@ outer: return taskSummery, err } - if len(favorites.posts) <= 0 { + if len(favorites.Posts) <= 0 { span.AddEvent("No more favorites found") log.WithContext(ctx).WithFields(basicLoggingInfo).Info("No more favorites found") break outer } - summery, err := BatchPostProcessingWithSummery(ctx, userSource, favorites.posts) + summery, err := BatchPostProcessingWithSummery(ctx, userSource, favorites.Posts) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) @@ -101,12 +101,12 @@ outer: return taskSummery, err } - if summery.AddedFavorites != int64(len(favorites.posts)) { + if summery.AddedFavorites != int64(len(favorites.Posts)) { span.AddEvent("user has no more favorites to add") break outer } - nextPage = favorites.nextPage + nextPage = favorites.NextPage taskSummery.AddedPosts += int(summery.AddedFavorites) } } -- 2.45.2 From 408c97743256db480e86b52b9743c14e3b5febff Mon Sep 17 00:00:00 2001 From: SoXX Date: Sat, 26 Oct 2024 20:30:12 +0200 Subject: [PATCH 03/19] fix: nil pointer for DB --- pkg/plug/algorithm.go | 23 +--------------------- pkg/plug/grpc.go | 45 ++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 43 insertions(+), 25 deletions(-) diff --git a/pkg/plug/algorithm.go b/pkg/plug/algorithm.go index 1950c54..fc29956 100644 --- a/pkg/plug/algorithm.go +++ b/pkg/plug/algorithm.go @@ -6,7 +6,6 @@ import ( log "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" - "gorm.io/gorm" ) type User struct { @@ -31,7 +30,7 @@ type Plug interface { GetUserProfile(ctx context.Context, apiKey string, userSource models.UserSource) (User, error) } -func Algorithm(ctx context.Context, plugInterface Plug, db *gorm.DB, userSource models.UserSource, deepScrape bool, apiKey string) (TaskSummery, error) { +func algorithm(ctx context.Context, plugInterface Plug, userSource models.UserSource, anthroveUserFavCount int64, deepScrape bool, apiKey string) (TaskSummery, error) { ctx, span := tracer.Start(ctx, "mainScrapeAlgorithm") defer span.End() @@ -54,14 +53,6 @@ func Algorithm(ctx context.Context, plugInterface Plug, db *gorm.DB, userSource DeletedPosts: 0, } - anthroveUserFavCount, err := getUserFavoriteCountFromDatabase(ctx, db, userSource.UserID, userSource.ID) - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed to get user favorite count from db") - return taskSummery, err - } - profile, err := plugInterface.GetUserProfile(ctx, apiKey, userSource) if err != nil { return taskSummery, err @@ -115,15 +106,3 @@ outer: log.WithContext(ctx).WithFields(basicLoggingInfo).Info("Completed scraping algorithm") return taskSummery, nil } - -// getUserFavoriteCountFromDatabase -func getUserFavoriteCountFromDatabase(ctx context.Context, gorm *gorm.DB, userID models.UserID, userSourceID models.UserSourceID) (int64, error) { - var count int64 - - err := gorm.WithContext(ctx).Model(&models.UserFavorite{}).Where("user_id = ? AND user_source_id = ?", userID, userSourceID).Count(&count).Error - if err != nil { - return count, err - } - - return count, nil -} diff --git a/pkg/plug/grpc.go b/pkg/plug/grpc.go index 379671a..fd45029 100644 --- a/pkg/plug/grpc.go +++ b/pkg/plug/grpc.go @@ -3,6 +3,7 @@ package plug import ( "context" "errors" + "gorm.io/gorm" "time" "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database" @@ -114,14 +115,40 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation) "user_source_id": creation.UserSourceId, }).Debug("Starting task") + db, err := database.GetGorm(taskCtx) + if err != nil { + log.WithContext(taskCtx).WithError(err).WithField("task_id", id).Error("Task execution failed") + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + dberr := database.UpdateScrapeHistory(taskCtx, models.ScrapeHistory{ + ScrapeTaskID: models.ScrapeTaskID(id), + UserSourceID: userSource.ID, + FinishedAt: time.Now(), + Error: errorString(err), + }) + return &plugTaskState, errors.Join(err, dberr) + } + + anthroveUserFavCount, err := getUserFavoriteCountFromDatabase(taskCtx, db, userSource.UserID, userSource.ID) + if err != nil { + log.WithContext(taskCtx).WithError(err).WithField("task_id", id).Error("Task execution failed") + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + dberr := database.UpdateScrapeHistory(taskCtx, models.ScrapeHistory{ + ScrapeTaskID: models.ScrapeTaskID(id), + UserSourceID: userSource.ID, + FinishedAt: time.Now(), + Error: errorString(err), + }) + return &plugTaskState, errors.Join(err, dberr) + } + go func() { - var err error - gorm, err := database.GetGorm(taskCtx) log.WithContext(taskCtx).WithError(err).WithField("task_id", id).Error("Failed to get Gorm client") span.RecordError(err) span.SetStatus(codes.Error, err.Error()) - taskSummery, err := Algorithm(taskCtx, s.plugInterface, gorm, userSource, creation.DeepScrape, creation.ApiKey) + taskSummery, err := algorithm(taskCtx, s.plugInterface, userSource, anthroveUserFavCount, creation.DeepScrape, creation.ApiKey) if err != nil { log.WithContext(taskCtx).WithError(err).WithField("task_id", id).Error("Task execution failed") span.RecordError(err) @@ -306,3 +333,15 @@ func errorString(err error) string { } return "" } + +// getUserFavoriteCountFromDatabase +func getUserFavoriteCountFromDatabase(ctx context.Context, gorm *gorm.DB, userID models.UserID, userSourceID models.UserSourceID) (int64, error) { + var count int64 + + err := gorm.WithContext(ctx).Model(&models.UserFavorite{}).Where("user_id = ? AND user_source_id = ?", userID, userSourceID).Count(&count).Error + if err != nil { + return count, err + } + + return count, nil +} -- 2.45.2 From 36d6ecd873f01d7d52cd2753867f0d5278a15549 Mon Sep 17 00:00:00 2001 From: SoXX Date: Sat, 26 Oct 2024 20:36:43 +0200 Subject: [PATCH 04/19] fix: Remove redundant error logging and tracing in TaskStart function --- pkg/plug/grpc.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/plug/grpc.go b/pkg/plug/grpc.go index fd45029..2099747 100644 --- a/pkg/plug/grpc.go +++ b/pkg/plug/grpc.go @@ -144,9 +144,7 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation) } go func() { - log.WithContext(taskCtx).WithError(err).WithField("task_id", id).Error("Failed to get Gorm client") - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) + var err error taskSummery, err := algorithm(taskCtx, s.plugInterface, userSource, anthroveUserFavCount, creation.DeepScrape, creation.ApiKey) if err != nil { -- 2.45.2 From 376d7ba75d8a69187b0fe3793d6ab7775573e484 Mon Sep 17 00:00:00 2001 From: SoXX Date: Sat, 26 Oct 2024 21:15:02 +0200 Subject: [PATCH 05/19] feat: implemented generic algorithm --- README.md | 4 ++-- go.mod | 2 +- pkg/plug/grpc.go | 2 +- pkg/plug/otlp.go | 2 +- pkg/plug/server.go | 4 ++-- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index e664d62..b789a1c 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ Anthrove Plug SDK is a Golang-based Software Development Kit (SDK) that provides To install the Anthrove Plug SDK, you will need to have Go installed on your system. You can then use the go get command to fetch the SDK: ```bash -go get git.anthrove.art/Anthrove/plug-sdk/v4 +go get git.anthrove.art/Anthrove/plug-sdk/v5 ``` ## Usage @@ -141,7 +141,7 @@ func main() { log.Fatal(err) } - plug.SetTaskExecutionFunction(service.YourTaskFunction) + plug.SetTaskExecutionFunction(service.plugInterface) plug.SetGetMessageExecutionFunction(service.YourMessageFunction) err = plug.Listen(ctx, ":8080", source) if err != nil { diff --git a/go.mod b/go.mod index fc0bae3..456d5a4 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module git.anthrove.art/Anthrove/plug-sdk/v4 +module git.anthrove.art/Anthrove/plug-sdk/v5 go 1.23.0 diff --git a/pkg/plug/grpc.go b/pkg/plug/grpc.go index 2099747..c0a34c1 100644 --- a/pkg/plug/grpc.go +++ b/pkg/plug/grpc.go @@ -8,7 +8,7 @@ import ( "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database" "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models" - gRPC "git.anthrove.art/Anthrove/plug-sdk/v4/pkg/grpc" + gRPC "git.anthrove.art/Anthrove/plug-sdk/v5/pkg/grpc" gonanoid "github.com/matoous/go-nanoid/v2" log "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/attribute" diff --git a/pkg/plug/otlp.go b/pkg/plug/otlp.go index 670e40e..9d33eae 100644 --- a/pkg/plug/otlp.go +++ b/pkg/plug/otlp.go @@ -3,7 +3,7 @@ package plug import ( "context" - "git.anthrove.art/Anthrove/plug-sdk/v4/pkg/telemetry" + "git.anthrove.art/Anthrove/plug-sdk/v5/pkg/telemetry" log "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/codes" ) diff --git a/pkg/plug/server.go b/pkg/plug/server.go index 6e329c6..f01026e 100644 --- a/pkg/plug/server.go +++ b/pkg/plug/server.go @@ -5,7 +5,7 @@ import ( "net" "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models" - pb "git.anthrove.art/Anthrove/plug-sdk/v4/pkg/grpc" + pb "git.anthrove.art/Anthrove/plug-sdk/v5/pkg/grpc" log "github.com/sirupsen/logrus" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/otel" @@ -15,7 +15,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) -var tracer = otel.Tracer("git.anthrove.art/Anthrove/plug-sdk/v4/pkg/plug") +var tracer = otel.Tracer("git.anthrove.art/Anthrove/plug-sdk/v5/pkg/plug") type Message struct { Title string -- 2.45.2 From 1435ae8ea457fe92aae3ce4f99fe7d3bdac0fb2b Mon Sep 17 00:00:00 2001 From: SoXX Date: Sat, 26 Oct 2024 21:31:27 +0200 Subject: [PATCH 06/19] refactor: Rename SetTaskExecutionFunction to RegisterPlugInterface for clarity and add docstring --- pkg/plug/server.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/plug/server.go b/pkg/plug/server.go index f01026e..4663cf0 100644 --- a/pkg/plug/server.go +++ b/pkg/plug/server.go @@ -97,7 +97,8 @@ func Listen(ctx context.Context, listenAddr string, source models.Source) error return nil } -func SetTaskExecutionFunction(plugInterface Plug) { +// RegisterPlugInterface sets the provided Plug interface implementation for task execution. +func RegisterPlugInterface(plugInterface Plug) { plugAPIInterface = plugInterface } -- 2.45.2 From b2db0664d6ce468f4f4afac7f8fe55d0cafc97e7 Mon Sep 17 00:00:00 2001 From: SoXX Date: Sat, 26 Oct 2024 23:13:24 +0200 Subject: [PATCH 07/19] feat: start implementing new algorithm --- pkg/plug/algorithm.go | 96 ++++++++++++++++++++++++++++--------------- pkg/plug/scrape.go | 46 +++++---------------- 2 files changed, 73 insertions(+), 69 deletions(-) diff --git a/pkg/plug/algorithm.go b/pkg/plug/algorithm.go index fc29956..9a8c0ce 100644 --- a/pkg/plug/algorithm.go +++ b/pkg/plug/algorithm.go @@ -2,10 +2,13 @@ package plug import ( "context" + "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database" "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models" log "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + "slices" ) type User struct { @@ -59,47 +62,72 @@ func algorithm(ctx context.Context, plugInterface Plug, userSource models.UserSo } nextPage := "" + var newPosts []models.Post + var anthroveFaves []models.UserFavorite outer: - for anthroveUserFavCount < profile.UserFavoriteCount { - select { - case <-ctx.Done(): - break outer - default: - span.AddEvent("Executing getFavorites request") - - favorites, err := plugInterface.GetFavoritePage(ctx, apiKey, userSource, nextPage) - span.AddEvent("Finished executing getFavorites request") - - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed to execute favorites page") - return taskSummery, err - } - - if len(favorites.Posts) <= 0 { - span.AddEvent("No more favorites found") - log.WithContext(ctx).WithFields(basicLoggingInfo).Info("No more favorites found") + for { + for anthroveUserFavCount < profile.UserFavoriteCount { + select { + case <-ctx.Done(): break outer - } + default: - summery, err := BatchPostProcessingWithSummery(ctx, userSource, favorites.Posts) - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed in BatchPostProcessing") - return taskSummery, err - } + span.AddEvent("Executing getFavorites request") + favorites, err := plugInterface.GetFavoritePage(ctx, apiKey, userSource, nextPage) + span.AddEvent("Finished executing getFavorites request") + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed to execute favorites page") + return taskSummery, err + } - if summery.AddedFavorites != int64(len(favorites.Posts)) { - span.AddEvent("user has no more favorites to add") - break outer - } + if len(favorites.Posts) == 0 { + span.AddEvent("No more favorites found") + log.WithContext(ctx).WithFields(basicLoggingInfo).Info("No more favorites found") + break outer + } - nextPage = favorites.NextPage - taskSummery.AddedPosts += int(summery.AddedFavorites) + summery := BatchSummery{} + newPosts, anthroveFaves, summery, err = BatchPostProcessingWithSummery(ctx, userSource, favorites.Posts) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed in BatchPostProcessing") + return taskSummery, err + } + + nextPage = favorites.NextPage + taskSummery.AddedPosts += int(summery.AddedFavorites) + } } + break outer + } + + if len(newPosts) > 0 { + err = database.CreatePostInBatch(ctx, newPosts, BatchSize) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + log.WithContext(ctx).WithError(err).Error("Failed to create new posts in batch") + return taskSummery, err + } + span.AddEvent("Created new posts in batch", trace.WithAttributes(attribute.Int("batch_size", BatchSize))) + log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Created new posts in batch") + } + + if len(anthroveFaves) > 0 { + slices.Reverse(anthroveFaves) + err = database.CreateUserFavoriteInBatch(ctx, anthroveFaves, BatchSize) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to create user favorites in batch") + return taskSummery, err + } + span.AddEvent("Created user favorites in batch", trace.WithAttributes(attribute.Int("batch_size", BatchSize))) + log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Created user favorites in batch") } span.AddEvent("Completed scraping algorithm") diff --git a/pkg/plug/scrape.go b/pkg/plug/scrape.go index bdc9807..cac3b4f 100644 --- a/pkg/plug/scrape.go +++ b/pkg/plug/scrape.go @@ -3,6 +3,7 @@ package plug import ( "context" "slices" + "time" "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database" "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models" @@ -21,12 +22,7 @@ type BatchSummery struct { AddedFavorites int64 } -func BatchPostProcessing(ctx context.Context, userSource models.UserSource, posts []models.Post) error { - _, err := BatchPostProcessingWithSummery(ctx, userSource, posts) - return err -} - -func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserSource, posts []models.Post) (BatchSummery, error) { +func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserSource, posts []models.Post) ([]models.Post, []models.UserFavorite, BatchSummery, error) { ctx, span := tracer.Start(ctx, "BatchPostProcessing") defer span.End() @@ -49,7 +45,7 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS span.RecordError(err) span.SetStatus(codes.Error, err.Error()) log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to get Gorm DB") - return BatchSummery{}, err + return nil, nil, BatchSummery{}, err } postIDs := make([]string, 0, len(posts)) @@ -64,7 +60,7 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS span.RecordError(err) span.SetStatus(codes.Error, err.Error()) log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to fetch existing posts") - return BatchSummery{}, err + return nil, nil, BatchSummery{}, err } span.AddEvent("Fetched existing posts", trace.WithAttributes(attribute.Int("existing_post_count", len(existingPosts)))) log.WithContext(ctx).WithFields(BasicLoggingFields).WithField("existing_post_count", len(existingPosts)).Info("Fetched existing posts") @@ -80,14 +76,14 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS span.RecordError(err) span.SetStatus(codes.Error, err.Error()) log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to fetch existing favorite posts") - return BatchSummery{}, err + return nil, nil, BatchSummery{}, err } span.AddEvent("Fetched existing favorite posts", trace.WithAttributes(attribute.Int("existing_fav_post_count", len(existingFavPostIDs)))) log.WithContext(ctx).WithFields(BasicLoggingFields).WithField("existing_fav_post_count", len(existingFavPostIDs)).Info("Fetched existing favorite posts") anthroveFaves := make([]models.UserFavorite, 0, len(existingPosts)) newPosts := make([]models.Post, 0, len(existingPosts)) - for _, post := range posts { + for i, post := range posts { if !slices.ContainsFunc(existingPosts, func(reference models.PostReference) bool { found := reference.SourcePostID == post.References[0].SourcePostID if found { @@ -102,9 +98,13 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS return found }) { anthroveFaves = append(anthroveFaves, models.UserFavorite{ + BaseModel: models.BaseModel[models.UserFavoriteID]{ + CreatedAt: time.Now().Add(time.Millisecond * time.Duration(i) * -1), + }, UserID: userSource.UserID, PostID: post.ID, UserSourceID: userSource.ID, + UserSource: models.UserSource{}, }) newPosts = append(newPosts, post) } @@ -112,31 +112,7 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS span.AddEvent("Processed posts for favorites and new posts", trace.WithAttributes(attribute.Int("new_post_count", len(newPosts)), attribute.Int("new_fav_count", len(anthroveFaves)))) log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Processed posts for favorites and new posts") - if len(newPosts) > 0 { - err = database.CreatePostInBatch(ctx, newPosts, BatchSize) - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - log.WithContext(ctx).WithError(err).Error("Failed to create new posts in batch") - return BatchSummery{}, err - } - span.AddEvent("Created new posts in batch", trace.WithAttributes(attribute.Int("batch_size", BatchSize))) - log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Created new posts in batch") - } - - if len(anthroveFaves) > 0 { - err = database.CreateUserFavoriteInBatch(ctx, anthroveFaves, BatchSize) - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to create user favorites in batch") - return BatchSummery{}, err - } - span.AddEvent("Created user favorites in batch", trace.WithAttributes(attribute.Int("batch_size", BatchSize))) - log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Created user favorites in batch") - } - - return BatchSummery{ + return newPosts, anthroveFaves, BatchSummery{ AddedPosts: int64(len(newPosts)), AddedFavorites: int64(len(anthroveFaves)), }, nil -- 2.45.2 From 44137a7251bb1bd142e11d7d18b9c2cae84303eb Mon Sep 17 00:00:00 2001 From: SoXX Date: Mon, 28 Oct 2024 21:08:54 +0100 Subject: [PATCH 08/19] feat: Add tracing events and logging for batch processing steps --- pkg/plug/algorithm.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/pkg/plug/algorithm.go b/pkg/plug/algorithm.go index 9a8c0ce..ed77874 100644 --- a/pkg/plug/algorithm.go +++ b/pkg/plug/algorithm.go @@ -72,7 +72,7 @@ outer: case <-ctx.Done(): break outer default: - + summery := BatchSummery{} span.AddEvent("Executing getFavorites request") favorites, err := plugInterface.GetFavoritePage(ctx, apiKey, userSource, nextPage) span.AddEvent("Finished executing getFavorites request") @@ -89,8 +89,9 @@ outer: break outer } - summery := BatchSummery{} + span.AddEvent("Executing BatchPostProcessingWithSummery") newPosts, anthroveFaves, summery, err = BatchPostProcessingWithSummery(ctx, userSource, favorites.Posts) + span.AddEvent("Finished executing BatchPostProcessingWithSummery") if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) @@ -98,6 +99,12 @@ outer: return taskSummery, err } + if len(anthroveFaves) > 0 { + span.AddEvent("No more favorites found to add") + log.WithContext(ctx).WithFields(basicLoggingInfo).Info("No more favorites found") + break outer + } + nextPage = favorites.NextPage taskSummery.AddedPosts += int(summery.AddedFavorites) } @@ -106,6 +113,7 @@ outer: } if len(newPosts) > 0 { + span.AddEvent("Executing CreatePostInBatch") err = database.CreatePostInBatch(ctx, newPosts, BatchSize) if err != nil { span.RecordError(err) @@ -118,6 +126,8 @@ outer: } if len(anthroveFaves) > 0 { + span.AddEvent("Executing CreateUserFavoriteInBatch") + slices.Reverse(anthroveFaves) err = database.CreateUserFavoriteInBatch(ctx, anthroveFaves, BatchSize) if err != nil { -- 2.45.2 From 263d65adc1efb900299fd9d24dc4ad2874e3e2f5 Mon Sep 17 00:00:00 2001 From: SoXX Date: Mon, 28 Oct 2024 21:29:25 +0100 Subject: [PATCH 09/19] fix: Fix conditional check for anthroveFaves length --- pkg/plug/algorithm.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/plug/algorithm.go b/pkg/plug/algorithm.go index ed77874..c2d0618 100644 --- a/pkg/plug/algorithm.go +++ b/pkg/plug/algorithm.go @@ -99,7 +99,7 @@ outer: return taskSummery, err } - if len(anthroveFaves) > 0 { + if len(anthroveFaves) < 0 { span.AddEvent("No more favorites found to add") log.WithContext(ctx).WithFields(basicLoggingInfo).Info("No more favorites found") break outer -- 2.45.2 From 16846c59c44ccff3c61e36240acb14fc899be974 Mon Sep 17 00:00:00 2001 From: SoXX Date: Mon, 28 Oct 2024 21:33:45 +0100 Subject: [PATCH 10/19] fix: variable scope and append results in BatchPostProcessing --- pkg/plug/algorithm.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/plug/algorithm.go b/pkg/plug/algorithm.go index c2d0618..4c2c40b 100644 --- a/pkg/plug/algorithm.go +++ b/pkg/plug/algorithm.go @@ -90,7 +90,11 @@ outer: } span.AddEvent("Executing BatchPostProcessingWithSummery") - newPosts, anthroveFaves, summery, err = BatchPostProcessingWithSummery(ctx, userSource, favorites.Posts) + pageNewPosts, pageAnthroveFaves, summery, err := BatchPostProcessingWithSummery(ctx, userSource, favorites.Posts) + + anthroveFaves = append(anthroveFaves, pageAnthroveFaves...) + newPosts = append(newPosts, pageNewPosts...) + span.AddEvent("Finished executing BatchPostProcessingWithSummery") if err != nil { span.RecordError(err) -- 2.45.2 From 799f943650b543dd38be4a506fe04a258368bba2 Mon Sep 17 00:00:00 2001 From: SoXX Date: Fri, 1 Nov 2024 21:06:03 +0100 Subject: [PATCH 11/19] refactor: improve readability and efficiency - Remove nested for loop and redundant checks - Streamline error handling and logging - Rename getAnthrovePost to getAnthrovePostReferences for clarity - Simplify BatchPostProcessingWithSummery to return only necessary values --- pkg/plug/algorithm.go | 87 ++++++++++++++++++++++--------------------- pkg/plug/scrape.go | 27 ++++++-------- 2 files changed, 56 insertions(+), 58 deletions(-) diff --git a/pkg/plug/algorithm.go b/pkg/plug/algorithm.go index 4c2c40b..f87a32a 100644 --- a/pkg/plug/algorithm.go +++ b/pkg/plug/algorithm.go @@ -67,51 +67,52 @@ func algorithm(ctx context.Context, plugInterface Plug, userSource models.UserSo outer: for { - for anthroveUserFavCount < profile.UserFavoriteCount { - select { - case <-ctx.Done(): - break outer - default: - summery := BatchSummery{} - span.AddEvent("Executing getFavorites request") - favorites, err := plugInterface.GetFavoritePage(ctx, apiKey, userSource, nextPage) - span.AddEvent("Finished executing getFavorites request") - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed to execute favorites page") - return taskSummery, err - } + if anthroveUserFavCount < profile.UserFavoriteCount && profile.UserFavoriteCount > 0 { + break outer + } - if len(favorites.Posts) == 0 { - span.AddEvent("No more favorites found") - log.WithContext(ctx).WithFields(basicLoggingInfo).Info("No more favorites found") - break outer - } - - span.AddEvent("Executing BatchPostProcessingWithSummery") - pageNewPosts, pageAnthroveFaves, summery, err := BatchPostProcessingWithSummery(ctx, userSource, favorites.Posts) - - anthroveFaves = append(anthroveFaves, pageAnthroveFaves...) - newPosts = append(newPosts, pageNewPosts...) - - span.AddEvent("Finished executing BatchPostProcessingWithSummery") - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed in BatchPostProcessing") - return taskSummery, err - } - - if len(anthroveFaves) < 0 { - span.AddEvent("No more favorites found to add") - log.WithContext(ctx).WithFields(basicLoggingInfo).Info("No more favorites found") - break outer - } - - nextPage = favorites.NextPage - taskSummery.AddedPosts += int(summery.AddedFavorites) + select { + case <-ctx.Done(): + break outer + default: + span.AddEvent("Executing getFavorites request") + favorites, err := plugInterface.GetFavoritePage(ctx, apiKey, userSource, nextPage) + span.AddEvent("Finished executing getFavorites request") + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed to execute favorites page") + return taskSummery, err } + + if len(favorites.Posts) == 0 { + span.AddEvent("No more favorites found") + log.WithContext(ctx).WithFields(basicLoggingInfo).Info("No more favorites found") + break outer + } + + span.AddEvent("Executing BatchPostProcessingWithSummery") + pageNewPosts, pageAnthroveFaves, err := BatchPostProcessingWithSummery(ctx, userSource, favorites.Posts) + + anthroveFaves = append(anthroveFaves, pageAnthroveFaves...) + newPosts = append(newPosts, pageNewPosts...) + + span.AddEvent("Finished executing BatchPostProcessingWithSummery") + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed in BatchPostProcessing") + return taskSummery, err + } + + if len(anthroveFaves) <= 0 || len(favorites.Posts) != len(anthroveFaves) { + span.AddEvent("No more favorites found to add") + log.WithContext(ctx).WithFields(basicLoggingInfo).Info("No more favorites found") + break outer + } + + nextPage = favorites.NextPage + taskSummery.AddedPosts += len(pageAnthroveFaves) } break outer } diff --git a/pkg/plug/scrape.go b/pkg/plug/scrape.go index cac3b4f..ac6680e 100644 --- a/pkg/plug/scrape.go +++ b/pkg/plug/scrape.go @@ -22,7 +22,7 @@ type BatchSummery struct { AddedFavorites int64 } -func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserSource, posts []models.Post) ([]models.Post, []models.UserFavorite, BatchSummery, error) { +func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserSource, posts []models.Post) ([]models.Post, []models.UserFavorite, error) { ctx, span := tracer.Start(ctx, "BatchPostProcessing") defer span.End() @@ -55,18 +55,18 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS span.AddEvent("Collected post IDs", trace.WithAttributes(attribute.Int("post_count", len(postIDs)))) log.WithContext(ctx).WithFields(BasicLoggingFields).WithField("post_count", len(postIDs)).Info("Collected post IDs") - existingPosts, err := getAnthrovePost(ctx, db, userSource.SourceID, postIDs) + existingPostsReferences, err := getAnthrovePostReferences(ctx, db, userSource.SourceID, postIDs) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to fetch existing posts") return nil, nil, BatchSummery{}, err } - span.AddEvent("Fetched existing posts", trace.WithAttributes(attribute.Int("existing_post_count", len(existingPosts)))) - log.WithContext(ctx).WithFields(BasicLoggingFields).WithField("existing_post_count", len(existingPosts)).Info("Fetched existing posts") + span.AddEvent("Fetched existing posts", trace.WithAttributes(attribute.Int("existing_post_count", len(existingPostsReferences)))) + log.WithContext(ctx).WithFields(BasicLoggingFields).WithField("existing_post_count", len(existingPostsReferences)).Info("Fetched existing posts") var existingPostIDs []models.PostID - for _, post := range existingPosts { + for _, post := range existingPostsReferences { existingPostIDs = append(existingPostIDs, models.PostID(post.PostID)) } @@ -81,10 +81,10 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS span.AddEvent("Fetched existing favorite posts", trace.WithAttributes(attribute.Int("existing_fav_post_count", len(existingFavPostIDs)))) log.WithContext(ctx).WithFields(BasicLoggingFields).WithField("existing_fav_post_count", len(existingFavPostIDs)).Info("Fetched existing favorite posts") - anthroveFaves := make([]models.UserFavorite, 0, len(existingPosts)) - newPosts := make([]models.Post, 0, len(existingPosts)) + anthroveFaves := make([]models.UserFavorite, 0, len(existingPostsReferences)) + newPosts := make([]models.Post, 0, len(existingPostsReferences)) for i, post := range posts { - if !slices.ContainsFunc(existingPosts, func(reference models.PostReference) bool { + if !slices.ContainsFunc(existingPostsReferences, func(reference models.PostReference) bool { found := reference.SourcePostID == post.References[0].SourcePostID if found { if !slices.Contains(existingFavPostIDs, models.PostID(reference.PostID)) { @@ -112,17 +112,14 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS span.AddEvent("Processed posts for favorites and new posts", trace.WithAttributes(attribute.Int("new_post_count", len(newPosts)), attribute.Int("new_fav_count", len(anthroveFaves)))) log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Processed posts for favorites and new posts") - return newPosts, anthroveFaves, BatchSummery{ - AddedPosts: int64(len(newPosts)), - AddedFavorites: int64(len(anthroveFaves)), - }, nil + return newPosts, anthroveFaves, nil } -func getAnthrovePost(ctx context.Context, gorm *gorm.DB, id models.SourceID, postIDs []string) ([]models.PostReference, error) { - ctx, span := tracer.Start(ctx, "getAnthrovePost") +func getAnthrovePostReferences(ctx context.Context, gorm *gorm.DB, id models.SourceID, postIDs []string) ([]models.PostReference, error) { + ctx, span := tracer.Start(ctx, "getAnthrovePostReferences") defer span.End() - log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Starting getAnthrovePost") + log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Starting getAnthrovePostReferences") var existingPosts []models.PostReference err := gorm.WithContext(ctx).Model(models.PostReference{}).Find(&existingPosts, "source_id = ? AND source_post_id IN ?", id, postIDs).Error -- 2.45.2 From 996ec30de1e9882b5e74d3fd9bf3571c700af2d9 Mon Sep 17 00:00:00 2001 From: SoXX Date: Fri, 1 Nov 2024 21:10:13 +0100 Subject: [PATCH 12/19] fix: Remove unused BatchSummery return values in error cases --- pkg/plug/scrape.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/plug/scrape.go b/pkg/plug/scrape.go index ac6680e..f02e13f 100644 --- a/pkg/plug/scrape.go +++ b/pkg/plug/scrape.go @@ -45,7 +45,7 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS span.RecordError(err) span.SetStatus(codes.Error, err.Error()) log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to get Gorm DB") - return nil, nil, BatchSummery{}, err + return nil, nil, err } postIDs := make([]string, 0, len(posts)) @@ -60,7 +60,7 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS span.RecordError(err) span.SetStatus(codes.Error, err.Error()) log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to fetch existing posts") - return nil, nil, BatchSummery{}, err + return nil, nil, err } span.AddEvent("Fetched existing posts", trace.WithAttributes(attribute.Int("existing_post_count", len(existingPostsReferences)))) log.WithContext(ctx).WithFields(BasicLoggingFields).WithField("existing_post_count", len(existingPostsReferences)).Info("Fetched existing posts") @@ -76,7 +76,7 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS span.RecordError(err) span.SetStatus(codes.Error, err.Error()) log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to fetch existing favorite posts") - return nil, nil, BatchSummery{}, err + return nil, nil, err } span.AddEvent("Fetched existing favorite posts", trace.WithAttributes(attribute.Int("existing_fav_post_count", len(existingFavPostIDs)))) log.WithContext(ctx).WithFields(BasicLoggingFields).WithField("existing_fav_post_count", len(existingFavPostIDs)).Info("Fetched existing favorite posts") -- 2.45.2 From 9d950cf31d5c22f119212a43e6a9441e7e579e0c Mon Sep 17 00:00:00 2001 From: SoXX Date: Fri, 1 Nov 2024 21:15:29 +0100 Subject: [PATCH 13/19] fix: comparison logic in algorithm to correctly break the loop --- pkg/plug/algorithm.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/plug/algorithm.go b/pkg/plug/algorithm.go index f87a32a..ae074ea 100644 --- a/pkg/plug/algorithm.go +++ b/pkg/plug/algorithm.go @@ -67,7 +67,7 @@ func algorithm(ctx context.Context, plugInterface Plug, userSource models.UserSo outer: for { - if anthroveUserFavCount < profile.UserFavoriteCount && profile.UserFavoriteCount > 0 { + if anthroveUserFavCount >= profile.UserFavoriteCount && profile.UserFavoriteCount > 0 { break outer } -- 2.45.2 From 0c0d8451c63e346729ea85bfe176c20091541a28 Mon Sep 17 00:00:00 2001 From: SoXX Date: Fri, 1 Nov 2024 21:29:28 +0100 Subject: [PATCH 14/19] fix: Remove redundant break statement in taskSummery block --- pkg/plug/algorithm.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/plug/algorithm.go b/pkg/plug/algorithm.go index ae074ea..48fd8c7 100644 --- a/pkg/plug/algorithm.go +++ b/pkg/plug/algorithm.go @@ -114,7 +114,6 @@ outer: nextPage = favorites.NextPage taskSummery.AddedPosts += len(pageAnthroveFaves) } - break outer } if len(newPosts) > 0 { -- 2.45.2 From 3da2501459acc7b52de5d2f3c032f3666d64d360 Mon Sep 17 00:00:00 2001 From: SoXX Date: Fri, 1 Nov 2024 21:29:28 +0100 Subject: [PATCH 15/19] Fix: variable name typo from anthroveFaves to pageAnthroveFaves --- pkg/plug/algorithm.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/plug/algorithm.go b/pkg/plug/algorithm.go index ae074ea..3ea9fe5 100644 --- a/pkg/plug/algorithm.go +++ b/pkg/plug/algorithm.go @@ -105,7 +105,7 @@ outer: return taskSummery, err } - if len(anthroveFaves) <= 0 || len(favorites.Posts) != len(anthroveFaves) { + if len(pageAnthroveFaves) <= 0 || len(favorites.Posts) != len(pageAnthroveFaves) { span.AddEvent("No more favorites found to add") log.WithContext(ctx).WithFields(basicLoggingInfo).Info("No more favorites found") break outer @@ -114,7 +114,6 @@ outer: nextPage = favorites.NextPage taskSummery.AddedPosts += len(pageAnthroveFaves) } - break outer } if len(newPosts) > 0 { -- 2.45.2 From 4d209f2a903bcfbddf6d2fc6c8df8f86334b6210 Mon Sep 17 00:00:00 2001 From: SoXX Date: Fri, 1 Nov 2024 22:14:29 +0100 Subject: [PATCH 16/19] fix: favorites creation timestamps and remove duplicate imports --- pkg/plug/algorithm.go | 6 ++++++ pkg/plug/scrape.go | 7 +------ 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/pkg/plug/algorithm.go b/pkg/plug/algorithm.go index 3ea9fe5..c080abf 100644 --- a/pkg/plug/algorithm.go +++ b/pkg/plug/algorithm.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" "slices" + "time" ) type User struct { @@ -132,7 +133,12 @@ outer: if len(anthroveFaves) > 0 { span.AddEvent("Executing CreateUserFavoriteInBatch") + for i, fav := range anthroveFaves { + fav.CreatedAt = time.Now().Add(time.Millisecond * time.Duration(i) * -1) + } + slices.Reverse(anthroveFaves) + err = database.CreateUserFavoriteInBatch(ctx, anthroveFaves, BatchSize) if err != nil { span.RecordError(err) diff --git a/pkg/plug/scrape.go b/pkg/plug/scrape.go index f02e13f..5f1262d 100644 --- a/pkg/plug/scrape.go +++ b/pkg/plug/scrape.go @@ -2,9 +2,6 @@ package plug import ( "context" - "slices" - "time" - "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database" "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models" log "github.com/sirupsen/logrus" @@ -12,6 +9,7 @@ import ( "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" "gorm.io/gorm" + "slices" ) var BatchSize = 50 @@ -98,9 +96,6 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS return found }) { anthroveFaves = append(anthroveFaves, models.UserFavorite{ - BaseModel: models.BaseModel[models.UserFavoriteID]{ - CreatedAt: time.Now().Add(time.Millisecond * time.Duration(i) * -1), - }, UserID: userSource.UserID, PostID: post.ID, UserSourceID: userSource.ID, -- 2.45.2 From 282e769c1318f6dd9303b52f874f585d40593f1c Mon Sep 17 00:00:00 2001 From: SoXX Date: Fri, 1 Nov 2024 22:15:43 +0100 Subject: [PATCH 17/19] fix: unused variable warning in post-processing loop --- pkg/plug/scrape.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/plug/scrape.go b/pkg/plug/scrape.go index 5f1262d..4965241 100644 --- a/pkg/plug/scrape.go +++ b/pkg/plug/scrape.go @@ -81,7 +81,7 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS anthroveFaves := make([]models.UserFavorite, 0, len(existingPostsReferences)) newPosts := make([]models.Post, 0, len(existingPostsReferences)) - for i, post := range posts { + for _, post := range posts { if !slices.ContainsFunc(existingPostsReferences, func(reference models.PostReference) bool { found := reference.SourcePostID == post.References[0].SourcePostID if found { -- 2.45.2 From ffd3767f3213066540a47a8f95ce25ce09153ec8 Mon Sep 17 00:00:00 2001 From: SoXX Date: Fri, 1 Nov 2024 22:29:09 +0100 Subject: [PATCH 18/19] fix: error when userID is empty - a error could happen if the user dose not have a source UserID/UserName set --- pkg/plug/algorithm.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/plug/algorithm.go b/pkg/plug/algorithm.go index c080abf..09ad768 100644 --- a/pkg/plug/algorithm.go +++ b/pkg/plug/algorithm.go @@ -62,6 +62,9 @@ func algorithm(ctx context.Context, plugInterface Plug, userSource models.UserSo return taskSummery, err } + userSource.UserID = models.UserID(profile.UserID) + userSource.AccountUsername = profile.UserName + nextPage := "" var newPosts []models.Post var anthroveFaves []models.UserFavorite -- 2.45.2 From af0dc2ab0db6076babe53695251fe6f1f1fe7959 Mon Sep 17 00:00:00 2001 From: SoXX Date: Fri, 1 Nov 2024 22:34:43 +0100 Subject: [PATCH 19/19] fix: variable userSource.UserID to userSource.AccountID for clarity --- pkg/plug/algorithm.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/plug/algorithm.go b/pkg/plug/algorithm.go index 09ad768..8f2e382 100644 --- a/pkg/plug/algorithm.go +++ b/pkg/plug/algorithm.go @@ -62,7 +62,7 @@ func algorithm(ctx context.Context, plugInterface Plug, userSource models.UserSo return taskSummery, err } - userSource.UserID = models.UserID(profile.UserID) + userSource.AccountID = profile.UserID userSource.AccountUsername = profile.UserName nextPage := "" -- 2.45.2