segmentio/kafka-go writer why it hold connection after stop sending messages

386 views Asked by At

I have a problem with my writer. I see in tcpdump that it send calls to kafka even I don't send messages to it. May be somebode already resolve this?

This is code for connection, send and close:

package kafka

import (
    "chrome-grabber/config"
    "chrome-grabber/grabber"
    logger "chrome-grabber/log"
    "context"
    "github.com/mailru/easyjson"
    "github.com/segmentio/kafka-go"
    "net"
)

var KafkaMessenger = Messenger{}

var IsConnectedToKafka bool = true

func getKafkaProducer(brokers []string, topic string) *kafka.Writer {
    transport := &kafka.DefaultTransport
    return &kafka.Writer{
        Addr:      kafka.TCP(brokers...),
        Topic:     topic,
        Balancer:  &kafka.LeastBytes{},
        BatchSize: 16384,
        Transport: *transport,
        Logger:      kafka.LoggerFunc(logger.Debug),
        ErrorLogger: kafka.LoggerFunc(logger.Debug),
    }
}

func sendKafkaMessage(ctx context.Context, kafkaWriter *kafka.Writer, message *grabber.KafkaMessage) error {
    marshalJsonEntry, err := easyjson.Marshal(message)
    if err != nil {
        IsConnectedToKafka = false
        logger.Error("Could not marshal message: %v", err)
        return err
    }

    msg := kafka.Message{
        Key:   []byte(message.Key),
        Value: marshalJsonEntry,
        Time:  message.Time,
    }
    logger.Info("Message to output: %s", msg)
    return kafkaWriter.WriteMessages(ctx, msg)

}

type ChannelMessage struct {
    Message *grabber.KafkaMessage
    Topic   string
}

type Messenger struct {
    requestProducer *kafka.Writer
    logProducer     *kafka.Writer
    transport       *kafka.Transport
    ctx             context.Context
}

func Init(ctx context.Context) {

    logger.Info("init kafka")

    grabberConfig := config.LoadConfig()
    KafkaMessenger.ctx = ctx
    transport := &kafka.Transport{
        Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
            d := &net.Dialer{}
            return d.DialContext(ctx, "tcp", address)
        },
    }
    KafkaMessenger.requestProducer = getKafkaProducer(grabberConfig.KafkaBrokers, grabberConfig.KafkaNetworkRequestTopic)
    KafkaMessenger.logProducer = getKafkaProducer(grabberConfig.KafkaBrokers, grabberConfig.KafkaBrowserLogTopic)
    KafkaMessenger.transport = transport

    logger.Error("Status: %v", KafkaMessenger.requestProducer.Stats().Errors)
}

func CloseConnection(ctx context.Context) {
    logger.Debug("Close connection to kafka")

    KafkaMessenger.ctx = ctx
    err := KafkaMessenger.requestProducer.Close()
    if err != nil {
        logger.Error("close requestProducer connection error: %v", err)
        return
    }
    err = KafkaMessenger.logProducer.Close()
    if err != nil {
        logger.Error("close logProducer connection error: %v", err)
        return
    }
    KafkaMessenger.transport.CloseIdleConnections()
}

func CheckConnection() error {
    logger.Info("Start to check connection")
    grabberConfig := config.LoadConfig()
    conn, err := kafka.Dial("tcp", grabberConfig.KafkaBrokers[0])
    if err != nil {
        logger.Error("failed connect to kafka:", err)
        IsConnectedToKafka = false
    } else {
        logger.Info("Close dial")
        IsConnectedToKafka = true
        conn.Close()
    }
    return err
}

func (m *Messenger) Send(msg *ChannelMessage, grabberConfig *grabber.Config) {

    logger.Info("Start sending")
    var ctx = m.ctx

    switch msg.Topic {

    case grabberConfig.KafkaBrowserLogTopic:
        err := sendKafkaMessage(ctx, m.logProducer, msg.Message)
        if err != nil {
            IsConnectedToKafka = false
            logger.Error("Could not write message to browser log topic: %v", err)
            m.logProducer = getKafkaProducer(grabberConfig.KafkaBrokers, grabberConfig.KafkaBrowserLogTopic)
        }
        m.transport.CloseIdleConnections()

    case grabberConfig.KafkaNetworkRequestTopic:
        err := sendKafkaMessage(ctx, m.requestProducer, msg.Message)
        if err != nil {
            IsConnectedToKafka = false
            //go CheckKafka()
            logger.Error("Could not write message to request topic: %v", err)
            m.requestProducer = getKafkaProducer(grabberConfig.KafkaBrokers, grabberConfig.KafkaNetworkRequestTopic)
        }
        m.transport.CloseIdleConnections()
    }
}

**And this code is for chrome grabber, which intercepts events and sends them to kafka. I am trying to implement a circuit breaker and stop sending any traffic to kafka. But it is still present even if I don't send messages and close channels **

package browser

import (
    "chrome-grabber/config"
    "chrome-grabber/grabber"
    grabberHttp "chrome-grabber/http"
    "chrome-grabber/kafka"
    logger "chrome-grabber/log"
    "chrome-grabber/utils"
    "context"
    "encoding/json"
    "fmt"
    cdpLog "github.com/chromedp/cdproto/log"
    "github.com/chromedp/cdproto/network"
    "github.com/chromedp/cdproto/target"
    "github.com/chromedp/chromedp"
    "net"
    "net/http"
    "os"
    "time"
)

//  канал по которому определяется, что пора закончить сбор информации
var closeBrowserConnection = make(chan bool)
var Enable bool

func getCurrentPage(ctx context.Context, targetId string) *grabber.KafkaMessagePageInfo {
    targets, err := chromedp.Targets(ctx)
    if err != nil {
        logger.Error(fmt.Sprintf("%v", err))
        return nil
    }

    for _, browserTarget := range targets {
        if browserTarget.Type == "page" && browserTarget.TargetID.String() == targetId {
            return grabber.GetKafkaMessagePageInfo(browserTarget.URL)
        }
    }
    logger.Debug("Could not find page browserTarget with id: %v", targetId)
    return nil
}

func checkChromeContext(ctx context.Context) bool {
    logger.Debug("check chrome context")
    var title = ""

    err := chromedp.Run(ctx, chromedp.Title(&title))

    if err != nil {
        logger.Error("error with chrome context: %v", err)
    } else {
        logger.Debug("context ok with title: %s", title)
    }

    return err == nil
}

func CheckPortOpen(host string, port int) bool {
    timeout := time.Second
    conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, fmt.Sprintf("%d", port)), timeout)

    if conn != nil {
        defer conn.Close()
    }
    if err != nil {
        logger.Debug("%v", err)
    }
    return err == nil && conn != nil
}

func getRemoteDebugContext(port int) (string, string, string) {
    resp, err := http.Get(fmt.Sprintf("http://localhost:%d/json", port))
    if err != nil {
        logger.PanicError(err)
        return "", "", ""
    }

    var result []map[string]interface{}

    if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
        logger.PanicError(err)
        return "", "", ""
    }
    for _, item := range result {
        if "page" == item["type"] {
            return item["webSocketDebuggerUrl"].(string), item["id"].(string), item["title"].(string)
        }
    }
    return "", "", ""
}

func tryConnectToBrowser(ctx context.Context, port int, chromeChannel chan context.Context) {

    logger.Debug("trying connect to Browser")
    config.SetGrabberRunning()
    ConnectToBrowser(ctx, port, chromeChannel)
    config.SetGrabberNotRunning()
}

func CheckBrowserPortEnabled(ctx context.Context) {
    var host = "localhost"
    var chromeChannel = make(chan context.Context)
    var chromeContext context.Context
    interval := time.Duration(config.LoadConfig().PortCheckInterval)
    ticker := time.NewTicker(interval * time.Second)
    isConnectedToKafka := config.LoadConfig().CheckKafkaConnectionFlag
    isGrabberStatusAvailable := config.LoadConfig().CheckKafkaConnectionFlag
    kafkaInterval := config.LoadConfig().KafkaCheckInterval
    var i, z = 0, 0
    var count = 0
    hostname, _ := os.Hostname() //получаем имя хоста
    //проверяем статус сервиса доступности граббера
    answer, _ := grabberHttp.Ping(config.LoadConfig().UrlToCheckService)
    logger.Error("! Is available sending messages by grabber_status service: %v for %v!", answer, hostname)
    for {
        <-ticker.C
        var port = config.LoadConfig().BrowserPort
        isPortOpen := CheckPortOpen(host, port)
        //isConnectedToKafka = kafka.IsConnectedToKafka

        isRunning := config.IsGrabberRunning()
        isConnectedToKafka = kafka.IsConnectedToKafka

        //опрашивает эндпоинт каждые три минуты и блокирует отправку сообщений в случае получения false
        if z >= 3 {
            answer, err := grabberHttp.Ping(config.LoadConfig().UrlToCheckService) 

            if !answer.GrabberEnable || err != nil {
                logger.Error("! The grabber_status service returned false or error, sending messages is unavailable for %v!", hostname)
                isGrabberStatusAvailable = false
                kafka.CloseConnection(ctx)

            } else {
                logger.Error("! The grabber_status service returned true, sending messages available for %v!", hostname)
                isGrabberStatusAvailable = true
            }
            z = 0
        }
        z++
        //проверка связи с кафкой если попытка отправить сообщение вызывает ошибку то запускается проверка связи с кафкой
        if !isConnectedToKafka {
            i += int(interval)      // после каждой попытки увеличивается на интервал в настройках
            if i >= kafkaInterval { // после трех попыток величина интервала 1 час и дальше увеличение интервала не происходит
                i = 0
                count++
                if count >= 3 {
                    kafkaInterval = 3600

                }
                isConnectedToKafka = kafka.CheckConnection() == nil

            }
        } else {
            count = 0
            kafkaInterval = config.LoadConfig().KafkaCheckInterval
        }

        Enable = isConnectedToKafka && isGrabberStatusAvailable
        logger.Debug("Enable: %v", Enable)
        logger.Debug("debug port open: %v\tgrabber running: %v\t Enable: %v", isPortOpen, isRunning, Enable)
        logger.Debug("isConnectedToKafka: %v\tisGrabberStatusAvailable: %v", isConnectedToKafka, isGrabberStatusAvailable)
        if isRunning && !isPortOpen {
            closeBrowserConnection <- true
            config.SetGrabberNotRunning()
            //коннектимся к браузеру только если есть доступ к кафке isConnectedToKafka = true
        } else if !isRunning && isPortOpen && isConnectedToKafka && isGrabberStatusAvailable {
            logger.Debug(" go tryConnectToBrowser")
            go tryConnectToBrowser(ctx, port, chromeChannel)
            chromeContext = <-chromeChannel
        }

        if config.IsGrabberRunning() {
            if !checkChromeContext(chromeContext) {
                closeBrowserConnection <- true
            }
        }
    }
}

func ConnectToBrowser(ctx context.Context, port int, chromeChannel chan context.Context) {
    debugUrl, targetId, title := getRemoteDebugContext(port)
    if len(debugUrl) == 0 {
        logger.Panic("Could not start browser log grabber")
    }

    // create allocator context for use with creating a browser context later
    allocatorContext, cancel := chromedp.NewRemoteAllocator(ctx, debugUrl)
    defer cancel()

    // create context
    ctxt, cancel := chromedp.NewContext(allocatorContext)
    defer cancel()
    c := chromedp.FromContext(ctxt)
    chromedp.WithTargetID(target.ID(targetId))(c)

    if err := chromedp.Run(ctxt, cdpLog.Enable()); err != nil {
        logger.Panic("Failed enable Log Domain: %v", err)
    }

    if err := chromedp.Run(ctxt, network.Enable()); err != nil {
        logger.Panic("Failed enable Network Domain: %v", err)
    }

    grabberConfig := config.LoadConfig()

    //слушаем события браузера
    chromedp.ListenTarget(ctxt, func(ev interface{}) {
        switch ev := ev.(type) {

        case *network.EventRequestWillBeSent:
            //сохраняем время старта запроса
            utils.Store(ev)
            if Enable {
                //logger.Debug("ENABLE")

                go kafka.KafkaMessenger.Send(&kafka.ChannelMessage{
                    Message: grabber.CreateRequestMessage(grabberConfig.Uid, ev, getCurrentPage(ctxt, targetId)),
                    Topic:   grabberConfig.KafkaNetworkRequestTopic,
                }, grabberConfig)
            }

        case *network.EventLoadingFinished:
            startNs := utils.FixDuration(ev.RequestID)
            if Enable {

                go kafka.KafkaMessenger.Send(&kafka.ChannelMessage{
                    Message: grabber.CreateCompleteRequestMessage(grabberConfig.Uid, ev, startNs),
                    Topic:   grabberConfig.KafkaNetworkRequestTopic,
                }, grabberConfig)
            }

        case *network.EventLoadingFailed:
            startNs := utils.FixDuration(ev.RequestID)
            if Enable {

                go kafka.KafkaMessenger.Send(&kafka.ChannelMessage{
                    Message: grabber.CreateFailedRequestMessage(grabberConfig.Uid, ev, startNs),
                    Topic:   grabberConfig.KafkaNetworkRequestTopic,
                }, grabberConfig)
            }

        case *network.EventResponseReceived:
            utils.StoreThisShit(ev)
            if Enable {

                go kafka.KafkaMessenger.Send(&kafka.ChannelMessage{
                    Message: grabber.CreateResponseReceivedMessage(grabberConfig.Uid, ev),
                    Topic:   grabberConfig.KafkaNetworkRequestTopic,
                }, grabberConfig)
            }

        case *cdpLog.EventEntryAdded:
            if ev.Entry.Level == "error" && Enable {
                go kafka.KafkaMessenger.Send(&kafka.ChannelMessage{
                    Message: grabber.CreateBrowserLogMessage(grabberConfig.Uid, ev, getCurrentPage(ctxt, targetId)),
                    Topic:   grabberConfig.KafkaBrowserLogTopic,
                }, grabberConfig)
            }
        }
    })

    logger.Info("Connected to %s\nWaits close Browser", title)
    chromeChannel <- ctxt
    <-closeBrowserConnection
}

I am tried to implement a circuit breaker and stop sending any traffic to kafka.

0

There are 0 answers