From 42954c7cf12d4f501e72942079caa31bc46bb7c3 Mon Sep 17 00:00:00 2001 From: SoXX Date: Wed, 4 Sep 2024 13:41:29 +0200 Subject: [PATCH 1/5] feat(grpc): added ping added ping function --- pkg/plug/grpc.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pkg/plug/grpc.go b/pkg/plug/grpc.go index 3a2b5f5..f769565 100644 --- a/pkg/plug/grpc.go +++ b/pkg/plug/grpc.go @@ -155,3 +155,21 @@ func (s *server) removeTask(taskID string) { fn() delete(s.ctx, taskID) } + +func (s *server) GetUserMessages(ctx context.Context, message *gRPC.GetMessagesRequest) (*gRPC.GetMessagesResponse, error) { + +} + +func (s *server) SendMessage(ctx context.Context, message *gRPC.SendMessageRequest) (*gRPC.SendMessageResponse, error) { +} + +func (s *server) Ping(ctx context.Context, ping *gRPC.PingRequest) (*gRPC.PongResponse, error) { + ctx, span := tracer.Start(ctx, "Ping") + defer span.End() + + var pong gRPC.PongResponse + pong.Message = ping.Message + pong.Timestamp = ping.Timestamp + + return &pong, nil +} -- 2.45.2 From 601ce9eae7455902e22cbcb3f2f854c7c1f948f3 Mon Sep 17 00:00:00 2001 From: SoXX Date: Wed, 4 Sep 2024 14:01:04 +0200 Subject: [PATCH 2/5] feat(grpc): added GetUserMessages & SendMessage finalized first draft for the endpoint integration --- pkg/plug/grpc.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/pkg/plug/grpc.go b/pkg/plug/grpc.go index f769565..9dc64f6 100644 --- a/pkg/plug/grpc.go +++ b/pkg/plug/grpc.go @@ -157,10 +157,54 @@ func (s *server) removeTask(taskID string) { } func (s *server) GetUserMessages(ctx context.Context, message *gRPC.GetMessagesRequest) (*gRPC.GetMessagesResponse, error) { + ctx, span := tracer.Start(ctx, "GetUserMessages") + defer span.End() + + sourceID := models.UserSourceID(message.UserSourceId) + userSource := models.UserSource{BaseModel: models.BaseModel[models.UserSourceID]{ID: sourceID}} + + messages, err := s.getMessageExecution(ctx, userSource) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + var response gRPC.GetMessagesResponse + for _, message := range messages { + response.Messages = append(response.Messages, &gRPC.Message{ + FromUserSourceId: string(userSource.ID), + CreatedAt: message.CreatedAt, + Body: message.Body, + Title: message.Title, + }) + } + + return &response, err } func (s *server) SendMessage(ctx context.Context, message *gRPC.SendMessageRequest) (*gRPC.SendMessageResponse, error) { + ctx, span := tracer.Start(ctx, "SendMessage") + defer span.End() + + response := &gRPC.SendMessageResponse{ + Success: false, + } + + sourceID := models.UserSourceID(message.UserSourceId) + userSource := models.UserSource{BaseModel: models.BaseModel[models.UserSourceID]{ID: sourceID}} + + err := s.sendMessageExecution(ctx, userSource, message.Message) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + + return response, err + } + + response.Success = true + return response, err } func (s *server) Ping(ctx context.Context, ping *gRPC.PingRequest) (*gRPC.PongResponse, error) { -- 2.45.2 From 2c8d6bd682fb77a4233dc1f3217e706dc5fb2316 Mon Sep 17 00:00:00 2001 From: SoXX Date: Wed, 4 Sep 2024 14:17:00 +0200 Subject: [PATCH 3/5] fix(grpc): get necessary information getting data from database for the userSource --- pkg/plug/grpc.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/pkg/plug/grpc.go b/pkg/plug/grpc.go index 9dc64f6..36b7b7b 100644 --- a/pkg/plug/grpc.go +++ b/pkg/plug/grpc.go @@ -160,8 +160,14 @@ func (s *server) GetUserMessages(ctx context.Context, message *gRPC.GetMessagesR ctx, span := tracer.Start(ctx, "GetUserMessages") defer span.End() - sourceID := models.UserSourceID(message.UserSourceId) - userSource := models.UserSource{BaseModel: models.BaseModel[models.UserSourceID]{ID: sourceID}} + userSourceID := models.UserSourceID(message.UserSourceId) + + userSource, err := database.GetUserSourceByID(ctx, userSourceID) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } messages, err := s.getMessageExecution(ctx, userSource) if err != nil { @@ -180,6 +186,12 @@ func (s *server) GetUserMessages(ctx context.Context, message *gRPC.GetMessagesR }) } + span.SetAttributes( + attribute.String("user_source_id", string(userSource.ID)), + attribute.String("user_id", string(userSource.UserID)), + attribute.String("source_id", string(userSource.SourceID)), + ) + return &response, err } -- 2.45.2 From b497348d2abd5cd04fed86f28e98a93eda2b9349 Mon Sep 17 00:00:00 2001 From: SoXX Date: Wed, 4 Sep 2024 14:53:01 +0200 Subject: [PATCH 4/5] feat(grpc): add logging --- pkg/plug/grpc.go | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/pkg/plug/grpc.go b/pkg/plug/grpc.go index 36b7b7b..dc8756e 100644 --- a/pkg/plug/grpc.go +++ b/pkg/plug/grpc.go @@ -81,7 +81,7 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation) log.WithContext(taskCtx).WithFields(log.Fields{ "task_id": id, "user_source_id": creation.UserSourceId, - }).Info("Starting task") + }).Debug("Starting task") go func() { err := s.taskExecutionFunction(taskCtx, userSource, creation.DeepScrape, creation.ApiKey, func() { @@ -92,7 +92,7 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation) span.RecordError(err) span.SetStatus(codes.Error, err.Error()) } else { - log.WithContext(taskCtx).WithField("task_id", id).Info("Task completed successfully") + log.WithContext(taskCtx).WithField("task_id", id).Debug("Task completed successfully") span.AddEvent("Task completed successfully", trace.WithAttributes(attribute.String("task_id", id))) } }() @@ -120,7 +120,7 @@ func (s *server) TaskStatus(ctx context.Context, task *gRPC.PlugTask) (*gRPC.Plu log.WithContext(ctx).WithFields(log.Fields{ "task_id": task.TaskId, "task_state": plugTaskState.TaskState, - }).Info("Task status requested") + }).Debug("Task status requested") span.SetAttributes(attribute.String("task_id", task.TaskId)) return &plugTaskState, nil @@ -141,7 +141,7 @@ func (s *server) TaskCancel(ctx context.Context, task *gRPC.PlugTask) (*gRPC.Plu log.WithContext(ctx).WithFields(log.Fields{ "task_id": task.TaskId, "task_state": plugTaskState.TaskState, - }).Info("Task cancellation requested") + }).Debug("Task cancellation requested") span.SetAttributes(attribute.String("task_id", task.TaskId)) return &plugTaskState, nil @@ -163,9 +163,11 @@ func (s *server) GetUserMessages(ctx context.Context, message *gRPC.GetMessagesR userSourceID := models.UserSourceID(message.UserSourceId) userSource, err := database.GetUserSourceByID(ctx, userSourceID) + if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) + log.WithContext(ctx).WithError(err).Error("getting userSource") return nil, err } @@ -173,6 +175,7 @@ func (s *server) GetUserMessages(ctx context.Context, message *gRPC.GetMessagesR if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) + log.WithContext(ctx).WithError(err).Error("execution function") return nil, err } @@ -192,6 +195,15 @@ func (s *server) GetUserMessages(ctx context.Context, message *gRPC.GetMessagesR attribute.String("source_id", string(userSource.SourceID)), ) + fields := log.Fields{ + "user_source_id": userSource.ID, + "user_id": userSource.UserID, + "source_id": userSource.SourceID, + "len_messages": len(messages), + } + + log.WithContext(ctx).WithFields(fields).Debug("Got User messages") + return &response, err } @@ -209,6 +221,7 @@ func (s *server) SendMessage(ctx context.Context, message *gRPC.SendMessageReque err := s.sendMessageExecution(ctx, userSource, message.Message) if err != nil { + log.WithContext(ctx).WithError(err).Error("sending message executiom") span.RecordError(err) span.SetStatus(codes.Error, err.Error()) @@ -227,5 +240,11 @@ func (s *server) Ping(ctx context.Context, ping *gRPC.PingRequest) (*gRPC.PongRe pong.Message = ping.Message pong.Timestamp = ping.Timestamp + fields := log.Fields{ + "messsage": ping.Message, + "timestamp": ping.Timestamp, + } + log.WithContext(ctx).WithFields(fields).Trace("got pinged") + return &pong, nil } -- 2.45.2 From 38cecba45503d1ecdf75d4ea295f05e6be27f29a Mon Sep 17 00:00:00 2001 From: SoXX Date: Wed, 4 Sep 2024 14:56:27 +0200 Subject: [PATCH 5/5] fix(grpc): spelling --- pkg/plug/grpc.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/plug/grpc.go b/pkg/plug/grpc.go index dc8756e..837d980 100644 --- a/pkg/plug/grpc.go +++ b/pkg/plug/grpc.go @@ -167,7 +167,7 @@ func (s *server) GetUserMessages(ctx context.Context, message *gRPC.GetMessagesR if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) - log.WithContext(ctx).WithError(err).Error("getting userSource") + log.WithContext(ctx).WithError(err).Error("Getting userSource") return nil, err } @@ -175,7 +175,7 @@ func (s *server) GetUserMessages(ctx context.Context, message *gRPC.GetMessagesR if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) - log.WithContext(ctx).WithError(err).Error("execution function") + log.WithContext(ctx).WithError(err).Error("Execution function") return nil, err } @@ -221,7 +221,7 @@ func (s *server) SendMessage(ctx context.Context, message *gRPC.SendMessageReque err := s.sendMessageExecution(ctx, userSource, message.Message) if err != nil { - log.WithContext(ctx).WithError(err).Error("sending message executiom") + log.WithContext(ctx).WithError(err).Error("Sending message execution") span.RecordError(err) span.SetStatus(codes.Error, err.Error()) @@ -244,7 +244,7 @@ func (s *server) Ping(ctx context.Context, ping *gRPC.PingRequest) (*gRPC.PongRe "messsage": ping.Message, "timestamp": ping.Timestamp, } - log.WithContext(ctx).WithFields(fields).Trace("got pinged") + log.WithContext(ctx).WithFields(fields).Trace("Got pinged") return &pong, nil } -- 2.45.2