New and generic scrape algorithm #11
@ -67,51 +67,52 @@ func algorithm(ctx context.Context, plugInterface Plug, userSource models.UserSo
|
||||
|
||||
outer:
|
||||
for {
|
||||
for anthroveUserFavCount < profile.UserFavoriteCount {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
break outer
|
||||
default:
|
||||
summery := BatchSummery{}
|
||||
span.AddEvent("Executing getFavorites request")
|
||||
favorites, err := plugInterface.GetFavoritePage(ctx, apiKey, userSource, nextPage)
|
||||
span.AddEvent("Finished executing getFavorites request")
|
||||
if err != nil {
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed to execute favorites page")
|
||||
return taskSummery, err
|
||||
}
|
||||
if anthroveUserFavCount < profile.UserFavoriteCount && profile.UserFavoriteCount > 0 {
|
||||
break outer
|
||||
}
|
||||
|
||||
if len(favorites.Posts) == 0 {
|
||||
span.AddEvent("No more favorites found")
|
||||
log.WithContext(ctx).WithFields(basicLoggingInfo).Info("No more favorites found")
|
||||
break outer
|
||||
}
|
||||
|
||||
span.AddEvent("Executing BatchPostProcessingWithSummery")
|
||||
pageNewPosts, pageAnthroveFaves, summery, err := BatchPostProcessingWithSummery(ctx, userSource, favorites.Posts)
|
||||
|
||||
anthroveFaves = append(anthroveFaves, pageAnthroveFaves...)
|
||||
newPosts = append(newPosts, pageNewPosts...)
|
||||
|
||||
span.AddEvent("Finished executing BatchPostProcessingWithSummery")
|
||||
if err != nil {
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed in BatchPostProcessing")
|
||||
return taskSummery, err
|
||||
}
|
||||
|
||||
if len(anthroveFaves) < 0 {
|
||||
span.AddEvent("No more favorites found to add")
|
||||
log.WithContext(ctx).WithFields(basicLoggingInfo).Info("No more favorites found")
|
||||
break outer
|
||||
}
|
||||
|
||||
nextPage = favorites.NextPage
|
||||
taskSummery.AddedPosts += int(summery.AddedFavorites)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
break outer
|
||||
default:
|
||||
span.AddEvent("Executing getFavorites request")
|
||||
favorites, err := plugInterface.GetFavoritePage(ctx, apiKey, userSource, nextPage)
|
||||
span.AddEvent("Finished executing getFavorites request")
|
||||
if err != nil {
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed to execute favorites page")
|
||||
return taskSummery, err
|
||||
}
|
||||
|
||||
if len(favorites.Posts) == 0 {
|
||||
span.AddEvent("No more favorites found")
|
||||
log.WithContext(ctx).WithFields(basicLoggingInfo).Info("No more favorites found")
|
||||
break outer
|
||||
}
|
||||
|
||||
span.AddEvent("Executing BatchPostProcessingWithSummery")
|
||||
pageNewPosts, pageAnthroveFaves, err := BatchPostProcessingWithSummery(ctx, userSource, favorites.Posts)
|
||||
|
||||
anthroveFaves = append(anthroveFaves, pageAnthroveFaves...)
|
||||
newPosts = append(newPosts, pageNewPosts...)
|
||||
|
||||
span.AddEvent("Finished executing BatchPostProcessingWithSummery")
|
||||
if err != nil {
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
log.WithContext(ctx).WithFields(basicLoggingInfo).WithError(err).Error("Failed in BatchPostProcessing")
|
||||
return taskSummery, err
|
||||
}
|
||||
|
||||
if len(anthroveFaves) <= 0 || len(favorites.Posts) != len(anthroveFaves) {
|
||||
span.AddEvent("No more favorites found to add")
|
||||
log.WithContext(ctx).WithFields(basicLoggingInfo).Info("No more favorites found")
|
||||
break outer
|
||||
}
|
||||
|
||||
nextPage = favorites.NextPage
|
||||
taskSummery.AddedPosts += len(pageAnthroveFaves)
|
||||
}
|
||||
break outer
|
||||
}
|
||||
|
@ -22,7 +22,7 @@ type BatchSummery struct {
|
||||
AddedFavorites int64
|
||||
}
|
||||
|
||||
func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserSource, posts []models.Post) ([]models.Post, []models.UserFavorite, BatchSummery, error) {
|
||||
func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserSource, posts []models.Post) ([]models.Post, []models.UserFavorite, error) {
|
||||
ctx, span := tracer.Start(ctx, "BatchPostProcessing")
|
||||
defer span.End()
|
||||
|
||||
@ -55,18 +55,18 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS
|
||||
span.AddEvent("Collected post IDs", trace.WithAttributes(attribute.Int("post_count", len(postIDs))))
|
||||
log.WithContext(ctx).WithFields(BasicLoggingFields).WithField("post_count", len(postIDs)).Info("Collected post IDs")
|
||||
|
||||
existingPosts, err := getAnthrovePost(ctx, db, userSource.SourceID, postIDs)
|
||||
existingPostsReferences, err := getAnthrovePostReferences(ctx, db, userSource.SourceID, postIDs)
|
||||
if err != nil {
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
log.WithContext(ctx).WithError(err).WithFields(BasicLoggingFields).Error("Failed to fetch existing posts")
|
||||
return nil, nil, BatchSummery{}, err
|
||||
}
|
||||
span.AddEvent("Fetched existing posts", trace.WithAttributes(attribute.Int("existing_post_count", len(existingPosts))))
|
||||
log.WithContext(ctx).WithFields(BasicLoggingFields).WithField("existing_post_count", len(existingPosts)).Info("Fetched existing posts")
|
||||
span.AddEvent("Fetched existing posts", trace.WithAttributes(attribute.Int("existing_post_count", len(existingPostsReferences))))
|
||||
log.WithContext(ctx).WithFields(BasicLoggingFields).WithField("existing_post_count", len(existingPostsReferences)).Info("Fetched existing posts")
|
||||
|
||||
var existingPostIDs []models.PostID
|
||||
for _, post := range existingPosts {
|
||||
for _, post := range existingPostsReferences {
|
||||
existingPostIDs = append(existingPostIDs, models.PostID(post.PostID))
|
||||
}
|
||||
|
||||
@ -81,10 +81,10 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS
|
||||
span.AddEvent("Fetched existing favorite posts", trace.WithAttributes(attribute.Int("existing_fav_post_count", len(existingFavPostIDs))))
|
||||
log.WithContext(ctx).WithFields(BasicLoggingFields).WithField("existing_fav_post_count", len(existingFavPostIDs)).Info("Fetched existing favorite posts")
|
||||
|
||||
anthroveFaves := make([]models.UserFavorite, 0, len(existingPosts))
|
||||
newPosts := make([]models.Post, 0, len(existingPosts))
|
||||
anthroveFaves := make([]models.UserFavorite, 0, len(existingPostsReferences))
|
||||
newPosts := make([]models.Post, 0, len(existingPostsReferences))
|
||||
for i, post := range posts {
|
||||
if !slices.ContainsFunc(existingPosts, func(reference models.PostReference) bool {
|
||||
if !slices.ContainsFunc(existingPostsReferences, func(reference models.PostReference) bool {
|
||||
found := reference.SourcePostID == post.References[0].SourcePostID
|
||||
if found {
|
||||
if !slices.Contains(existingFavPostIDs, models.PostID(reference.PostID)) {
|
||||
@ -112,17 +112,14 @@ func BatchPostProcessingWithSummery(ctx context.Context, userSource models.UserS
|
||||
span.AddEvent("Processed posts for favorites and new posts", trace.WithAttributes(attribute.Int("new_post_count", len(newPosts)), attribute.Int("new_fav_count", len(anthroveFaves))))
|
||||
log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Processed posts for favorites and new posts")
|
||||
|
||||
return newPosts, anthroveFaves, BatchSummery{
|
||||
AddedPosts: int64(len(newPosts)),
|
||||
AddedFavorites: int64(len(anthroveFaves)),
|
||||
}, nil
|
||||
return newPosts, anthroveFaves, nil
|
||||
}
|
||||
|
||||
func getAnthrovePost(ctx context.Context, gorm *gorm.DB, id models.SourceID, postIDs []string) ([]models.PostReference, error) {
|
||||
ctx, span := tracer.Start(ctx, "getAnthrovePost")
|
||||
func getAnthrovePostReferences(ctx context.Context, gorm *gorm.DB, id models.SourceID, postIDs []string) ([]models.PostReference, error) {
|
||||
ctx, span := tracer.Start(ctx, "getAnthrovePostReferences")
|
||||
defer span.End()
|
||||
|
||||
log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Starting getAnthrovePost")
|
||||
log.WithContext(ctx).WithFields(BasicLoggingFields).Info("Starting getAnthrovePostReferences")
|
||||
|
||||
var existingPosts []models.PostReference
|
||||
err := gorm.WithContext(ctx).Model(models.PostReference{}).Find(&existingPosts, "source_id = ? AND source_post_id IN ?", id, postIDs).Error
|
||||
|
Loading…
Reference in New Issue
Block a user