Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cmd/bricksllm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (
"github.com/redis/go-redis/v9"
)

const numEventConsumers = 16

func main() {
modePtr := flag.String("m", "dev", "select the mode that bricksllm runs in")
privacyPtr := flag.String("p", "strict", "select the privacy mode that bricksllm runs in")
Expand Down Expand Up @@ -361,12 +363,12 @@ func main() {
c := cache.NewCache(apiCache)

messageBus := message.NewMessageBus()
eventMessageChan := make(chan message.Message)
eventMessageChan := make(chan message.Message, 1000)
messageBus.Subscribe("event", eventMessageChan)

handler := message.NewHandler(rec, log, ace, ce, vllme, aoe, v, uv, m, um, rlm, accessCache, userAccessCache)

eventConsumer := message.NewConsumer(eventMessageChan, log, 4, handler.HandleEventWithRequestAndResponse)
eventConsumer := message.NewConsumer(eventMessageChan, log, numEventConsumers, handler.HandleEventWithRequestAndResponse)
eventConsumer.StartEventMessageConsumers()
Comment thread
sergei-bronnikov marked this conversation as resolved.

detector, err := amazon.NewClient(cfg.AmazonRequestTimeout, cfg.AmazonConnectionTimeout, log, cfg.AmazonRegion)
Expand Down
23 changes: 20 additions & 3 deletions internal/message/consumer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package message

import (
"sync"

"github.com/bricks-cloud/bricksllm/internal/event"
"github.com/bricks-cloud/bricksllm/internal/key"
"go.uber.org/zap"
Expand All @@ -12,6 +14,7 @@ type Consumer struct {
log *zap.Logger
numOfEventConsumers int
handle func(Message) error
wg sync.WaitGroup
}

type recorder interface {
Expand All @@ -33,12 +36,25 @@ func NewConsumer(mc <-chan Message, log *zap.Logger, num int, handle func(Messag

func (c *Consumer) StartEventMessageConsumers() {
for i := 0; i < c.numOfEventConsumers; i++ {
c.wg.Add(1)
go func() {
defer c.wg.Done()

for {
select {
case <-c.done:
c.log.Info("event message consumer stoped...")
return
for {
select {
case m := <-c.messageChan:
err := c.handle(m)
if err != nil {
continue
}
default:
c.log.Info("event message consumer stoped...")
return
}
}

case m := <-c.messageChan:
err := c.handle(m)
Expand All @@ -56,5 +72,6 @@ func (c *Consumer) StartEventMessageConsumers() {
func (c *Consumer) Stop() {
c.log.Info("shutting down consumer...")

c.done <- true
close(c.done)
c.wg.Wait()
}
6 changes: 6 additions & 0 deletions internal/provider/openai/cost.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,10 @@ func init() {
for model := range OpenAiPerThousandTokenCost["images-tokens-input"] {
imageModelsWithTokensCost[model] = struct{}{}
}

for _, tool := range AllowedTools {
AllowedToolsSet[tool] = struct{}{}
}
}

var OpenAiPerThousandCallsToolCost = map[string]float64{
Expand Down Expand Up @@ -401,6 +405,8 @@ var AllowedTools = []string{
"skills",
}

var AllowedToolsSet = map[string]struct{}{}

type tokenCounter interface {
Count(model string, input string) (int, error)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/server/web/proxy/anthropic.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func getMessagesHandler(prod, private bool, client http.Client, e anthropicEstim
model := c.GetString("model")

if !isStreaming && res.StatusCode == http.StatusOK {
dur := time.Now().Sub(start)
dur := time.Since(start)
telemetry.Timing("bricksllm.proxy.get_messages_handler.latency", dur, nil, 1)

bytes, err := io.ReadAll(res.Body)
Expand Down
Loading
Loading