diff --git a/pkg/plug/grpc.go b/pkg/plug/grpc.go index 3a2b5f5..837d980 100644 --- a/pkg/plug/grpc.go +++ b/pkg/plug/grpc.go @@ -81,7 +81,7 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation) log.WithContext(taskCtx).WithFields(log.Fields{ "task_id": id, "user_source_id": creation.UserSourceId, - }).Info("Starting task") + }).Debug("Starting task") go func() { err := s.taskExecutionFunction(taskCtx, userSource, creation.DeepScrape, creation.ApiKey, func() { @@ -92,7 +92,7 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation) span.RecordError(err) span.SetStatus(codes.Error, err.Error()) } else { - log.WithContext(taskCtx).WithField("task_id", id).Info("Task completed successfully") + log.WithContext(taskCtx).WithField("task_id", id).Debug("Task completed successfully") span.AddEvent("Task completed successfully", trace.WithAttributes(attribute.String("task_id", id))) } }() @@ -120,7 +120,7 @@ func (s *server) TaskStatus(ctx context.Context, task *gRPC.PlugTask) (*gRPC.Plu log.WithContext(ctx).WithFields(log.Fields{ "task_id": task.TaskId, "task_state": plugTaskState.TaskState, - }).Info("Task status requested") + }).Debug("Task status requested") span.SetAttributes(attribute.String("task_id", task.TaskId)) return &plugTaskState, nil @@ -141,7 +141,7 @@ func (s *server) TaskCancel(ctx context.Context, task *gRPC.PlugTask) (*gRPC.Plu log.WithContext(ctx).WithFields(log.Fields{ "task_id": task.TaskId, "task_state": plugTaskState.TaskState, - }).Info("Task cancellation requested") + }).Debug("Task cancellation requested") span.SetAttributes(attribute.String("task_id", task.TaskId)) return &plugTaskState, nil @@ -155,3 +155,96 @@ func (s *server) removeTask(taskID string) { fn() delete(s.ctx, taskID) } + +func (s *server) GetUserMessages(ctx context.Context, message *gRPC.GetMessagesRequest) (*gRPC.GetMessagesResponse, error) { + ctx, span := tracer.Start(ctx, "GetUserMessages") + defer span.End() + + userSourceID := models.UserSourceID(message.UserSourceId) + + userSource, err := database.GetUserSourceByID(ctx, userSourceID) + + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + log.WithContext(ctx).WithError(err).Error("Getting userSource") + return nil, err + } + + messages, err := s.getMessageExecution(ctx, userSource) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + log.WithContext(ctx).WithError(err).Error("Execution function") + return nil, err + } + + var response gRPC.GetMessagesResponse + for _, message := range messages { + response.Messages = append(response.Messages, &gRPC.Message{ + FromUserSourceId: string(userSource.ID), + CreatedAt: message.CreatedAt, + Body: message.Body, + Title: message.Title, + }) + } + + span.SetAttributes( + attribute.String("user_source_id", string(userSource.ID)), + attribute.String("user_id", string(userSource.UserID)), + attribute.String("source_id", string(userSource.SourceID)), + ) + + fields := log.Fields{ + "user_source_id": userSource.ID, + "user_id": userSource.UserID, + "source_id": userSource.SourceID, + "len_messages": len(messages), + } + + log.WithContext(ctx).WithFields(fields).Debug("Got User messages") + + return &response, err + +} + +func (s *server) SendMessage(ctx context.Context, message *gRPC.SendMessageRequest) (*gRPC.SendMessageResponse, error) { + ctx, span := tracer.Start(ctx, "SendMessage") + defer span.End() + + response := &gRPC.SendMessageResponse{ + Success: false, + } + + sourceID := models.UserSourceID(message.UserSourceId) + userSource := models.UserSource{BaseModel: models.BaseModel[models.UserSourceID]{ID: sourceID}} + + err := s.sendMessageExecution(ctx, userSource, message.Message) + if err != nil { + log.WithContext(ctx).WithError(err).Error("Sending message execution") + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + + return response, err + } + + response.Success = true + return response, err +} + +func (s *server) Ping(ctx context.Context, ping *gRPC.PingRequest) (*gRPC.PongResponse, error) { + ctx, span := tracer.Start(ctx, "Ping") + defer span.End() + + var pong gRPC.PongResponse + pong.Message = ping.Message + pong.Timestamp = ping.Timestamp + + fields := log.Fields{ + "messsage": ping.Message, + "timestamp": ping.Timestamp, + } + log.WithContext(ctx).WithFields(fields).Trace("Got pinged") + + return &pong, nil +}