Code Monkey home page Code Monkey logo

Comments (5)

heliosgo avatar heliosgo commented on June 12, 2024

其实目前的逻辑就是舍弃旧的,具体可以看看

gmqtt/server/server.go

Lines 342 to 499 in a28cd4f

func (srv *server) registerClient(connect *packets.Connect, client *client) (sessionResume bool, err error) {
var qs queue.Store
var ua unack.Store
var sess *gmqtt.Session
var oldSession *gmqtt.Session
now := time.Now()
oldSession, err = srv.lockDuplicatedID(client)
if err != nil {
return
}
defer func() {
if err == nil {
var willMsg *gmqtt.Message
var willDelayInterval, expiryInterval uint32
if connect.WillFlag {
willMsg = &gmqtt.Message{
QoS: connect.WillQos,
Topic: string(connect.WillTopic),
Payload: connect.WillMsg,
}
setWillProperties(connect.WillProperties, willMsg)
}
// use default expiry if the client version is version3.1.1
if packets.IsVersion3X(client.version) && !connect.CleanStart {
expiryInterval = uint32(srv.config.MQTT.SessionExpiry.Seconds())
} else if connect.Properties != nil {
willDelayInterval = convertUint32(connect.WillProperties.WillDelayInterval, 0)
expiryInterval = client.opts.SessionExpiry
}
sess = &gmqtt.Session{
ClientID: client.opts.ClientID,
Will: willMsg,
ConnectedAt: time.Now(),
WillDelayInterval: willDelayInterval,
ExpiryInterval: expiryInterval,
}
err = srv.sessionStore.Set(sess)
}
if err == nil {
client.session = sess
if sessionResume {
// If a new Network Connection to this Session is made before the Will Delay Interval has passed,
// the Server MUST NOT send the Will Message [MQTT-3.1.3-9].
if w, ok := srv.willMessage[client.opts.ClientID]; ok {
w.signal(false)
}
if srv.hooks.OnSessionResumed != nil {
srv.hooks.OnSessionResumed(context.Background(), client)
}
srv.statsManager.sessionActive(false)
} else {
if srv.hooks.OnSessionCreated != nil {
srv.hooks.OnSessionCreated(context.Background(), client)
}
srv.statsManager.sessionActive(true)
}
srv.clients[client.opts.ClientID] = client
srv.unackStore[client.opts.ClientID] = ua
srv.queueStore[client.opts.ClientID] = qs
client.queueStore = qs
client.unackStore = ua
if client.version == packets.Version5 {
client.topicAliasManager = srv.newTopicAliasManager(client.config, client.opts.ClientTopicAliasMax, client.opts.ClientID)
}
}
srv.mu.Unlock()
}()
client.setConnected(time.Now())
if srv.hooks.OnConnected != nil {
srv.hooks.OnConnected(context.Background(), client)
}
srv.statsManager.clientConnected(client.opts.ClientID)
if oldSession != nil {
if !oldSession.IsExpired(now) && !connect.CleanStart {
sessionResume = true
}
// clean old session
if !sessionResume {
err = srv.sessionTerminatedLocked(oldSession.ClientID, TakenOverTermination)
if err != nil {
err = fmt.Errorf("session terminated fail: %w", err)
zaplog.Error("session terminated fail", zap.Error(err))
}
// Send will message because the previous session is ended.
if w, ok := srv.willMessage[client.opts.ClientID]; ok {
w.signal(true)
}
} else {
qs = srv.queueStore[client.opts.ClientID]
if qs != nil {
err = qs.Init(&queue.InitOptions{
CleanStart: false,
Version: client.version,
ReadBytesLimit: client.opts.ClientMaxPacketSize,
Notifier: client.queueNotifier,
})
if err != nil {
return
}
}
ua = srv.unackStore[client.opts.ClientID]
if ua != nil {
err = ua.Init(false)
if err != nil {
return
}
}
if ua == nil || qs == nil {
// This could happen if backend store loss some data which will bring the session into "inconsistent state".
// We should create a new session and prevent the client reuse the inconsistent one.
sessionResume = false
zaplog.Error("detect inconsistent session state",
zap.String("remote_addr", client.rwc.RemoteAddr().String()),
zap.String("client_id", client.opts.ClientID))
} else {
zaplog.Info("logged in with session reuse",
zap.String("remote_addr", client.rwc.RemoteAddr().String()),
zap.String("client_id", client.opts.ClientID))
}
}
}
if !sessionResume {
// create new session
// It is ok to pass nil to defaultNotifier, because we will call Init to override it.
qs, err = srv.persistence.NewQueueStore(srv.config, nil, client.opts.ClientID)
if err != nil {
return
}
err = qs.Init(&queue.InitOptions{
CleanStart: true,
Version: client.version,
ReadBytesLimit: client.opts.ClientMaxPacketSize,
Notifier: client.queueNotifier,
})
if err != nil {
return
}
ua, err = srv.persistence.NewUnackStore(srv.config, client.opts.ClientID)
if err != nil {
return
}
err = ua.Init(true)
if err != nil {
return
}
zaplog.Info("logged in with new session",
zap.String("remote_addr", client.rwc.RemoteAddr().String()),
zap.String("client_id", client.opts.ClientID),
)
}
delete(srv.offlineClients, client.opts.ClientID)
return
}

oldSession, err = srv.lockDuplicatedID(client)

from gmqtt.

luohao123 avatar luohao123 commented on June 12, 2024

那为啥每次会报错 login with duplated id,然后客户端就掉线了

from gmqtt.

heliosgo avatar heliosgo commented on June 12, 2024

那为啥每次会报错 login with duplated id,然后客户端就掉线了

掉线的是旧的,会与新连接建立会话。正常是这样的才对。我刚测试了一下,好像没啥问题

from gmqtt.

luohao123 avatar luohao123 commented on June 12, 2024

@jiennyx shamblett/mqtt_client#460 (comment)

你这么一说,似乎有点道理,似乎好像是 我用client_id 5566 一开始链接了,然后如果服务端似乎有一个链接缓存,再次以5566 链接的时候,它就会断开老的,但是由于实际上老的就是本次的链接,所以表现为我一连上,立马就断开了。

可以这么理解吗?

from gmqtt.

heliosgo avatar heliosgo commented on June 12, 2024

@luohao123 不太理解 但是由于实际上老的就是本次的链接,所以表现为我一连上,立马就断开了。
我贴段日志把,看起来清楚点。

2023-11-15T16:34:17.030+0800 INFO server/server.go:327 logging w
ith duplicate ClientID {"remote": "192.168.19.118:61779", "client_id": "mqttx_67fa4d05"}

2023-11-15T16:34:17.031+0800 WARN server/client.go:274 connectio
n lost {"client_id": "mqttx_67fa4d05", "remote_addr": "192.168.19.118:61768", "error": "operation
error: Code = 8e, reasonString: "}

2023-11-15T16:34:17.031+0800 INFO server/server.go:599 logged ou
t and cleaning session {"remote_addr": "192.168.19.118:61768", "client_id": "mqttx_67fa4d05"}

2023-11-15T16:34:17.032+0800 INFO server/server.go:492 logged in
with new session {"remote_addr": "192.168.19.118:61779", "client_id": "mqttx_67fa4d05"}

:61768 被断开,:61779 正常建立。

from gmqtt.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.