Compare commits
4 Commits
117c15e371
...
6b7f570027
Author | SHA1 | Date | |
---|---|---|---|
|
6b7f570027 | ||
|
b58206fca0 | ||
|
74f3c934a7 | ||
|
fdca511587 |
@ -61,7 +61,6 @@ func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation)
|
|||||||
// by using this method we assign a new context to each new request we get.
|
// by using this method we assign a new context to each new request we get.
|
||||||
// This can be used for example to close the context with the given id
|
// This can be used for example to close the context with the given id
|
||||||
ctx = trace.ContextWithSpanContext(context.Background(), trace.NewSpanContext(trace.SpanContextConfig{TraceID: span.SpanContext().TraceID()}))
|
ctx = trace.ContextWithSpanContext(context.Background(), trace.NewSpanContext(trace.SpanContextConfig{TraceID: span.SpanContext().TraceID()}))
|
||||||
|
|
||||||
taskCtx, cancel := context.WithCancel(ctx)
|
taskCtx, cancel := context.WithCancel(ctx)
|
||||||
s.ctx[id] = cancel
|
s.ctx[id] = cancel
|
||||||
span.AddEvent("Created new context for task", trace.WithAttributes(attribute.String("task_id", id)))
|
span.AddEvent("Created new context for task", trace.WithAttributes(attribute.String("task_id", id)))
|
||||||
|
31
pkg/plug/otlp.go
Normal file
31
pkg/plug/otlp.go
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
package plug
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"git.anthrove.art/Anthrove/plug-sdk/v3/pkg/telemetry"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
"go.opentelemetry.io/otel/codes"
|
||||||
|
)
|
||||||
|
|
||||||
|
func SetupOpenTelemtry(ctx context.Context, serviceName string) error {
|
||||||
|
ctx, span := tracer.Start(ctx, "SetupOpenTelemtry")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
err := telemetry.SetupMeterProvider(serviceName)
|
||||||
|
if err != nil {
|
||||||
|
span.RecordError(err)
|
||||||
|
span.SetStatus(codes.Error, err.Error())
|
||||||
|
log.WithContext(ctx).WithError(err).Error("Failed to setup meter provider")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = telemetry.SetupTraceProvider(ctx, serviceName)
|
||||||
|
if err != nil {
|
||||||
|
span.RecordError(err)
|
||||||
|
span.SetStatus(codes.Error, err.Error())
|
||||||
|
log.WithContext(ctx).WithError(err).Error("Failed to setup trace provider")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
@ -2,13 +2,11 @@ package plug
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"net"
|
"net"
|
||||||
|
|
||||||
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database"
|
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/database"
|
||||||
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models"
|
"git.anthrove.art/Anthrove/otter-space-sdk/v3/pkg/models"
|
||||||
pb "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc"
|
pb "git.anthrove.art/Anthrove/plug-sdk/v3/pkg/grpc"
|
||||||
"git.anthrove.art/Anthrove/plug-sdk/v3/pkg/telemetry"
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"go.opentelemetry.io/otel"
|
"go.opentelemetry.io/otel"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
@ -29,57 +27,40 @@ type TaskExecution func(ctx context.Context, userSource models.UserSource, deepS
|
|||||||
type SendMessageExecution func(ctx context.Context, userSource models.UserSource, message string) error
|
type SendMessageExecution func(ctx context.Context, userSource models.UserSource, message string) error
|
||||||
type GetMessageExecution func(ctx context.Context, userSource models.UserSource) ([]Message, error)
|
type GetMessageExecution func(ctx context.Context, userSource models.UserSource) ([]Message, error)
|
||||||
|
|
||||||
type Plug struct {
|
var (
|
||||||
address string
|
|
||||||
port string
|
|
||||||
ctx context.Context
|
|
||||||
taskExecutionFunction TaskExecution
|
taskExecutionFunction TaskExecution
|
||||||
sendMessageExecution SendMessageExecution
|
sendMessageExecution SendMessageExecution
|
||||||
getMessageExecution GetMessageExecution
|
getMessageExecution GetMessageExecution
|
||||||
source models.Source
|
)
|
||||||
plugName string
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewPlug(ctx context.Context, plugName string, address string, port string, source models.Source) Plug {
|
func Listen(ctx context.Context, listenAddr string, source models.Source) error {
|
||||||
return Plug{
|
ctx, span := tracer.Start(ctx, "Listen")
|
||||||
ctx: ctx,
|
|
||||||
address: address,
|
|
||||||
port: port,
|
|
||||||
source: source,
|
|
||||||
plugName: plugName,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Plug) Listen() error {
|
|
||||||
ctx, span := tracer.Start(p.ctx, "Listen")
|
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
var source models.Source
|
|
||||||
|
|
||||||
span.SetAttributes(
|
span.SetAttributes(
|
||||||
attribute.String("source_display_name", string(p.source.DisplayName)),
|
attribute.String("source_display_name", string(source.DisplayName)),
|
||||||
attribute.String("source_domain", string(p.source.Domain)),
|
attribute.String("source_domain", string(source.Domain)),
|
||||||
)
|
)
|
||||||
|
|
||||||
sourceFields := log.Fields{
|
sourceFields := log.Fields{
|
||||||
"source_display_name": p.source.DisplayName,
|
"source_display_name": source.DisplayName,
|
||||||
"source_domain": p.source.Domain,
|
"source_domain": source.Domain,
|
||||||
}
|
}
|
||||||
|
|
||||||
serverFields := log.Fields{
|
serverFields := log.Fields{
|
||||||
"address": p.address,
|
"address": listenAddr,
|
||||||
"port": p.port,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
source, err = database.GetSourceByDomain(ctx, p.source.Domain)
|
source, err = database.GetSourceByDomain(ctx, source.Domain)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err.Error() == "Database error: NoDataFound" {
|
if err.Error() == "Database error: NoDataFound" {
|
||||||
span.AddEvent("No Source found, initializing source")
|
span.AddEvent("No Source found, initializing source")
|
||||||
|
|
||||||
log.WithContext(ctx).WithFields(sourceFields).Info("No Source found, initializing source!")
|
log.WithContext(ctx).WithFields(sourceFields).Info("No Source found, initializing source!")
|
||||||
|
|
||||||
source, err = database.CreateSource(ctx, p.source)
|
source, err = database.CreateSource(ctx, source)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
span.SetStatus(codes.Error, err.Error())
|
span.SetStatus(codes.Error, err.Error())
|
||||||
@ -98,13 +79,11 @@ func (p *Plug) Listen() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
p.source = source
|
|
||||||
|
|
||||||
span.SetAttributes(
|
span.SetAttributes(
|
||||||
attribute.String("source_id", string(p.source.ID)),
|
attribute.String("source_id", string(source.ID)),
|
||||||
)
|
)
|
||||||
|
|
||||||
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%s", p.address, p.port))
|
lis, err := net.Listen("tcp", listenAddr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
span.SetStatus(codes.Error, err.Error())
|
span.SetStatus(codes.Error, err.Error())
|
||||||
@ -114,70 +93,34 @@ func (p *Plug) Listen() error {
|
|||||||
|
|
||||||
grpcServer := grpc.NewServer()
|
grpcServer := grpc.NewServer()
|
||||||
|
|
||||||
pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(p.source, p.taskExecutionFunction, p.sendMessageExecution, p.getMessageExecution))
|
pb.RegisterPlugConnectorServer(grpcServer, NewGrpcServer(source, taskExecutionFunction, sendMessageExecution, getMessageExecution))
|
||||||
|
|
||||||
err = grpcServer.Serve(lis)
|
go func() {
|
||||||
if err != nil {
|
err = grpcServer.Serve(lis)
|
||||||
span.RecordError(err)
|
if err != nil {
|
||||||
span.SetStatus(codes.Error, err.Error())
|
span.RecordError(err)
|
||||||
log.WithContext(ctx).WithError(err).Error("Failed to serve gRPC server")
|
span.SetStatus(codes.Error, err.Error())
|
||||||
return err
|
log.WithContext(ctx).WithError(err).Error("Failed to serve gRPC server")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
log.WithContext(ctx).Info("Context stopped! Shutdown Plug")
|
||||||
|
grpcServer.GracefulStop()
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Plug) GetSource() models.Source {
|
func SetTaskExecutionFunction(function TaskExecution) {
|
||||||
_, span := tracer.Start(p.ctx, "GetSource")
|
taskExecutionFunction = function
|
||||||
defer span.End()
|
|
||||||
|
|
||||||
span.SetAttributes(attribute.String("source_domain", string(p.source.Domain)))
|
|
||||||
return p.source
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Plug) TaskExecutionFunction(function TaskExecution) {
|
func SetSendMessageExecutionFunction(function SendMessageExecution) {
|
||||||
_, span := tracer.Start(p.ctx, "TaskExecutionFunction")
|
sendMessageExecution = function
|
||||||
defer span.End()
|
|
||||||
|
|
||||||
p.taskExecutionFunction = function
|
|
||||||
span.AddEvent("Task execution function set")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Plug) SendMessageExecution(function SendMessageExecution) {
|
func SetGetMessageExecutionFunction(function GetMessageExecution) {
|
||||||
_, span := tracer.Start(p.ctx, "SendMessageExecution")
|
getMessageExecution = function
|
||||||
defer span.End()
|
|
||||||
|
|
||||||
p.sendMessageExecution = function
|
|
||||||
span.AddEvent("Send message execution function set")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Plug) GetMessageExecution(function GetMessageExecution) {
|
|
||||||
_, span := tracer.Start(p.ctx, "GetMessageExecution")
|
|
||||||
defer span.End()
|
|
||||||
|
|
||||||
p.getMessageExecution = function
|
|
||||||
span.AddEvent("Get message execution function set")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p Plug) SetupOpenTelemtry() error {
|
|
||||||
ctx, span := tracer.Start(p.ctx, "SetupOpenTelemtry")
|
|
||||||
defer span.End()
|
|
||||||
|
|
||||||
err := telemetry.SetupMeterProvider(p.plugName)
|
|
||||||
if err != nil {
|
|
||||||
span.RecordError(err)
|
|
||||||
span.SetStatus(codes.Error, err.Error())
|
|
||||||
log.WithContext(ctx).WithError(err).Error("Failed to setup meter provider")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = telemetry.SetupTraceProvider(ctx, p.plugName)
|
|
||||||
if err != nil {
|
|
||||||
span.RecordError(err)
|
|
||||||
span.SetStatus(codes.Error, err.Error())
|
|
||||||
log.WithContext(ctx).WithError(err).Error("Failed to setup trace provider")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user