diff --git a/pkg/plug/algorithm.go b/pkg/plug/algorithm.go index fc29956..9a8c0ce 100644 --- a/pkg/plug/algorithm.go +++ b/pkg/plug/algorithm.go @@ -2,10 +2,13 @@ package plug import ( "context" + "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database" "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models" log "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + "slices" ) type User struct { @@ -59,47 +62,72 @@ func algorithm(ctx context.Context, plugInterface Plug, userSource models.UserSo } nextPage := "" + var newPosts []models.Post + var anthroveFaves []models.UserFavorite outer: - for anthroveUserFavCount < profile.UserFavoriteCount { - select { - case <-ctx.Done(): - break outer - default: - span.AddEvent("Executing getFavorites request") - - favorites, err := plugInterface.GetFavoritePage(ctx, apiKey, userSource, nextPage) - span.AddEvent("Finished executing getFavorites request") - - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed to execute favorites page") - return taskSummery, err - } - - if len(favorites.Posts) <= 0 { - span.AddEvent("No more favorites found") - log.WithContext(ctx).WithFields(basicLoggingInfo).Info("No more favorites found") + for { + for anthroveUserFavCount < profile.UserFavoriteCount { + select { + case <-ctx.Done(): break outer - } + default: - summery, err := BatchPostProcessingWithSummery(ctx, userSource, favorites.Posts) - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed in BatchPostProcessing") - return taskSummery, err - } + span.AddEvent("Executing getFavorites request") + favorites, err := plugInterface.GetFavoritePage(ctx, apiKey, userSource, nextPage) + span.AddEvent("Finished executing getFavorites request") + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed to execute favorites page") + return taskSummery, err + } - if summery.AddedFavorites != int64(len(favorites.Posts)) { - span.AddEvent("user has no more favorites to add") - break outer - } + if len(favorites.Posts) == 0 { + span.AddEvent("No more favorites found") + log.WithContext(ctx).WithFields(basicLoggingInfo).Info("No more favorites found") + break outer + } - nextPage = favorites.NextPage - taskSummery.AddedPosts += int(summery.AddedFavorites) + summery := BatchSummery{} + newPosts, anthroveFaves, summery, err = BatchPostProcessingWithSummery(ctx, userSource, favorites.Posts) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed in BatchPostProcessing") + return taskSummery, err + } + + nextPage = favorites.NextPage + taskSummery.AddedPosts += int(summery.AddedFavorites) + } } + break outer + } + + if len(newPosts) > 0 { + err = database.CreatePostInBatch(ctx, newPosts, BatchSize) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + log.WithContext(ctx).WithError(err).Error("Failed to create new posts in batch") + return taskSummery, err + } + span.AddEvent("Created new posts in batch", trace.WithAttributes(attribute.Int("batch_size", BatchSize))) + log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Created new posts in batch") + } + + if len(anthroveFaves) > 0 { + slices.Reverse(anthroveFaves) + err = database.CreateUserFavoriteInBatch(ctx, anthroveFaves, BatchSize) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to create user favorites in batch") + return taskSummery, err + } + span.AddEvent("Created user favorites in batch", trace.WithAttributes(attribute.Int("batch_size", BatchSize))) + log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Created user favorites in batch") } span.AddEvent("Completed scraping algorithm") diff --git a/pkg/plug/scrape.go b/pkg/plug/scrape.go index bdc9807..cac3b4f 100644 --- a/pkg/plug/scrape.go +++ b/pkg/plug/scrape.go @@ -3,6 +3,7 @@ package plug import ( "context" "slices" + "time" "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/database" "git.anthrove.art/Anthrove/otter-space-sdk/v4/pkg/models" @@ -21,12 +22,7 @@ type BatchSummery struct { AddedFavorites int64 } -func BatchPostProcessing(ctx context.Context, userSource models.UserSource, posts []models.Post) error { - _, err := BatchPostProcessingWithSummery(ctx, userSource, posts) - return err -} - -func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserSource, posts []models.Post) (BatchSummery, error) { +func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserSource, posts []models.Post) ([]models.Post, []models.UserFavorite, BatchSummery, error) { ctx, span := tracer.Start(ctx, "BatchPostProcessing") defer span.End() @@ -49,7 +45,7 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS span.RecordError(err) span.SetStatus(codes.Error, err.Error()) log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to get Gorm DB") - return BatchSummery{}, err + return nil, nil, BatchSummery{}, err } postIDs := make([]string, 0, len(posts)) @@ -64,7 +60,7 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS span.RecordError(err) span.SetStatus(codes.Error, err.Error()) log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to fetch existing posts") - return BatchSummery{}, err + return nil, nil, BatchSummery{}, err } span.AddEvent("Fetched existing posts", trace.WithAttributes(attribute.Int("existing_post_count", len(existingPosts)))) log.WithContext(ctx).WithFields(BasicLoggingFields).WithField("existing_post_count", len(existingPosts)).Info("Fetched existing posts") @@ -80,14 +76,14 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS span.RecordError(err) span.SetStatus(codes.Error, err.Error()) log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to fetch existing favorite posts") - return BatchSummery{}, err + return nil, nil, BatchSummery{}, err } span.AddEvent("Fetched existing favorite posts", trace.WithAttributes(attribute.Int("existing_fav_post_count", len(existingFavPostIDs)))) log.WithContext(ctx).WithFields(BasicLoggingFields).WithField("existing_fav_post_count", len(existingFavPostIDs)).Info("Fetched existing favorite posts") anthroveFaves := make([]models.UserFavorite, 0, len(existingPosts)) newPosts := make([]models.Post, 0, len(existingPosts)) - for _, post := range posts { + for i, post := range posts { if !slices.ContainsFunc(existingPosts, func(reference models.PostReference) bool { found := reference.SourcePostID == post.References[0].SourcePostID if found { @@ -102,9 +98,13 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS return found }) { anthroveFaves = append(anthroveFaves, models.UserFavorite{ + BaseModel: models.BaseModel[models.UserFavoriteID]{ + CreatedAt: time.Now().Add(time.Millisecond * time.Duration(i) * -1), + }, UserID: userSource.UserID, PostID: post.ID, UserSourceID: userSource.ID, + UserSource: models.UserSource{}, }) newPosts = append(newPosts, post) } @@ -112,31 +112,7 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS span.AddEvent("Processed posts for favorites and new posts", trace.WithAttributes(attribute.Int("new_post_count", len(newPosts)), attribute.Int("new_fav_count", len(anthroveFaves)))) log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Processed posts for favorites and new posts") - if len(newPosts) > 0 { - err = database.CreatePostInBatch(ctx, newPosts, BatchSize) - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - log.WithContext(ctx).WithError(err).Error("Failed to create new posts in batch") - return BatchSummery{}, err - } - span.AddEvent("Created new posts in batch", trace.WithAttributes(attribute.Int("batch_size", BatchSize))) - log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Created new posts in batch") - } - - if len(anthroveFaves) > 0 { - err = database.CreateUserFavoriteInBatch(ctx, anthroveFaves, BatchSize) - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to create user favorites in batch") - return BatchSummery{}, err - } - span.AddEvent("Created user favorites in batch", trace.WithAttributes(attribute.Int("batch_size", BatchSize))) - log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Created user favorites in batch") - } - - return BatchSummery{ + return newPosts, anthroveFaves, BatchSummery{ AddedPosts: int64(len(newPosts)), AddedFavorites: int64(len(anthroveFaves)), }, nil