dev/extending-sdk-with-missing-grpc-functions #8
@ -81,7 +81,7 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation)
|
||||
log.WithContext(taskCtx).WithFields(log.Fields{
|
||||
"task_id": id,
|
||||
"user_source_id": creation.UserSourceId,
|
||||
}).Info("Starting task")
|
||||
}).Debug("Starting task")
|
||||
|
||||
go func() {
|
||||
err := s.taskExecutionFunction(taskCtx, userSource, creation.DeepScrape, creation.ApiKey, func() {
|
||||
@ -92,7 +92,7 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation)
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
} else {
|
||||
log.WithContext(taskCtx).WithField("task_id", id).Info("Task completed successfully")
|
||||
log.WithContext(taskCtx).WithField("task_id", id).Debug("Task completed successfully")
|
||||
span.AddEvent("Task completed successfully", trace.WithAttributes(attribute.String("task_id", id)))
|
||||
}
|
||||
}()
|
||||
@ -120,7 +120,7 @@ func (s *server) TaskStatus(ctx context.Context, task *gRPC.PlugTask) (*gRPC.Plu
|
||||
log.WithContext(ctx).WithFields(log.Fields{
|
||||
"task_id": task.TaskId,
|
||||
"task_state": plugTaskState.TaskState,
|
||||
}).Info("Task status requested")
|
||||
}).Debug("Task status requested")
|
||||
|
||||
span.SetAttributes(attribute.String("task_id", task.TaskId))
|
||||
return &plugTaskState, nil
|
||||
@ -141,7 +141,7 @@ func (s *server) TaskCancel(ctx context.Context, task *gRPC.PlugTask) (*gRPC.Plu
|
||||
log.WithContext(ctx).WithFields(log.Fields{
|
||||
"task_id": task.TaskId,
|
||||
"task_state": plugTaskState.TaskState,
|
||||
}).Info("Task cancellation requested")
|
||||
}).Debug("Task cancellation requested")
|
||||
|
||||
span.SetAttributes(attribute.String("task_id", task.TaskId))
|
||||
return &plugTaskState, nil
|
||||
@ -163,9 +163,11 @@ func (s *server) GetUserMessages(ctx context.Context, message *gRPC.GetMessagesR
|
||||
userSourceID := models.UserSourceID(message.UserSourceId)
|
||||
|
||||
userSource, err := database.GetUserSourceByID(ctx, userSourceID)
|
||||
|
||||
if err != nil {
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
log.WithContext(ctx).WithError(err).Error("getting userSource")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -173,6 +175,7 @@ func (s *server) GetUserMessages(ctx context.Context, message *gRPC.GetMessagesR
|
||||
if err != nil {
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
log.WithContext(ctx).WithError(err).Error("execution function")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -192,6 +195,15 @@ func (s *server) GetUserMessages(ctx context.Context, message *gRPC.GetMessagesR
|
||||
attribute.String("source_id", string(userSource.SourceID)),
|
||||
)
|
||||
|
||||
fields := log.Fields{
|
||||
"user_source_id": userSource.ID,
|
||||
"user_id": userSource.UserID,
|
||||
"source_id": userSource.SourceID,
|
||||
"len_messages": len(messages),
|
||||
}
|
||||
|
||||
log.WithContext(ctx).WithFields(fields).Debug("Got User messages")
|
||||
|
||||
return &response, err
|
||||
|
||||
}
|
||||
@ -209,6 +221,7 @@ func (s *server) SendMessage(ctx context.Context, message *gRPC.SendMessageReque
|
||||
|
||||
err := s.sendMessageExecution(ctx, userSource, message.Message)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).WithError(err).Error("sending message executiom")
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
|
||||
@ -227,5 +240,11 @@ func (s *server) Ping(ctx context.Context, ping *gRPC.PingRequest) (*gRPC.PongRe
|
||||
pong.Message = ping.Message
|
||||
pong.Timestamp = ping.Timestamp
|
||||
|
||||
fields := log.Fields{
|
||||
"messsage": ping.Message,
|
||||
"timestamp": ping.Timestamp,
|
||||
}
|
||||
log.WithContext(ctx).WithFields(fields).Trace("got pinged")
|
||||
|
||||
return &pong, nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user