Code Monkey home page Code Monkey logo

Comments (43)

JasontieZhu avatar JasontieZhu commented on July 17, 2024 1

OK, if (cnode instanceof TNode) should cleantomb again, until deleted successfully.

private Action remove(String clientId, Topic topic, INode inode, INode iParent) {
Token token = topic.headToken();
if (!topic.isEmpty() && (inode.mainNode().anyChildrenMatch(token))) {
Topic remainingTopic = topic.exceptHeadToken();
INode nextInode = inode.mainNode().childOf(token);
return remove(clientId, remainingTopic, nextInode, inode);
} else {
final CNode cnode = inode.mainNode();
if (cnode instanceof TNode) {
// this inode is a tomb, has no clients and should be cleaned up
// Because we implemented cleanTomb below, this should be rare, but possible
// Consider calling cleanTomb here too
return cleanTomb(inode, iParent);
}
if (cnode.containsOnly(clientId) && topic.isEmpty() && cnode.allChildren().isEmpty()) {
// last client to leave this node, AND there are no downstream children, remove via TNode tomb
if (inode == this.root) {
return inode.compareAndSet(cnode, inode.mainNode().copy()) ? Action.OK : Action.REPEAT;
}
TNode tnode = new TNode();
return inode.compareAndSet(cnode, tnode) ? cleanTomb(inode, iParent) : Action.REPEAT;
} else if (cnode.contains(clientId) && topic.isEmpty()) {
CNode updatedCnode = cnode.copy();
updatedCnode.removeSubscriptionsFor(clientId);
return inode.compareAndSet(cnode, updatedCnode) ? Action.OK : Action.REPEAT;
} else {
//someone else already removed
return Action.OK;
}

from moquette.

JasontieZhu avatar JasontieZhu commented on July 17, 2024

We want to modify the Ctrie remove function
if (cnode instanceof TNode) {
// this inode is a tomb, has no clients and should be cleaned up
// Because we implemented cleanTomb below, this should be rare, but possible
// Consider calling cleanTomb here too
return cleanTomb(inode, iParent) ? Action.OK : Action.REPEAT
}

and a thread can be used to automatically clear the TNode

from moquette.

JasontieZhu avatar JasontieZhu commented on July 17, 2024

There's a possibility
1.inode.compareAndSet successfully, but cleantomb failed when remove the node, and action is change to repeat
2. run the remove again,and judge that the cnode is change to Tnode. So return the action.ok,the node is not clear

from moquette.

JasontieZhu avatar JasontieZhu commented on July 17, 2024

I found that the TNode's token is null.

because
1、In the old version, it is new TNode() in the Ctrie.remove function, but in the new version, it is new TNode(cnode.getToken()) in the Ctrie.remove function.
2、In the new version, the TNode can be reused when createNodeAndInsertSubscription but not in the old version

so if the tnode remove failed, the number of TNode become more and more in the old version, but not in the new version

the code as follows:
new version:
if (cnode.containsOnly(clientId) && topic.isEmpty() && cnode.allChildren().isEmpty()) {
// last client to leave this node, AND there are no downstream children, remove via TNode tomb
if (inode == this.root) {
return inode.compareAndSet(cnode, inode.mainNode().copy()) ? Action.OK : Action.REPEAT;
}
TNode tnode = new TNode(cnode.getToken());
return inode.compareAndSet(cnode, tnode) ? cleanTomb(inode, iParent) : Action.REPEAT;

old version:
if (cnode.containsOnly(clientId) && topic.isEmpty() && cnode.allChildren().isEmpty()) {
// last client to leave this node, AND there are no downstream children, remove via TNode tomb
if (inode == this.root) {
return inode.compareAndSet(cnode, inode.mainNode().copy()) ? Action.OK : Action.REPEAT;
}
TNode tnode = new TNode();
return inode.compareAndSet(cnode, tnode) ? cleanTomb(inode, iParent) : Action.REPEAT;

private Action createNodeAndInsertSubscription(Topic topic, INode inode, CNode cnode, SubscriptionRequest request) {
    final INode newInode = createPathRec(topic, request);
    final CNode updatedCnode;
    **if (cnode instanceof TNode) {
        updatedCnode = new CNode(cnode.getToken());**
    } else {
        updatedCnode = cnode.copy();
    }
    updatedCnode.add(newInode);

    return inode.compareAndSet(cnode, updatedCnode) ? Action.OK_NEW : Action.REPEAT;

from moquette.

JasontieZhu avatar JasontieZhu commented on July 17, 2024

I found that it's maybe solve by the commit
hylkevds
62cb2b3

from moquette.

JasontieZhu avatar JasontieZhu commented on July 17, 2024

What the scenarios does the system not automatically clear TNodes?

Just set TNode successfully, but remove TNode failed.

I can't figure out why.

from moquette.

JasontieZhu avatar JasontieZhu commented on July 17, 2024
private Action cleanTomb(INode inode, INode iParent) {
    CNode updatedCnode = iParent.mainNode().copy();
    updatedCnode.remove(inode);
    return iParent.compareAndSet(iParent.mainNode(), updatedCnode) ? Action.OK : Action.REPEAT;
}

public void remove(INode node) {
    this.children.remove(node) ;
}

why do not just change to

private Action cleanTomb(INode inode, INode iParent) {
    return iParent.mainNode().remove(inode) ? Action.OK : Action.REPEAT;
}


public boolean remove(INode node) {
    return this.children.remove(node);
}

from moquette.

hylkevds avatar hylkevds commented on July 17, 2024

the compareAndSet is because there may be many parallel threads trying to edit the same node at the same time, and only one can "win" and have it's changes be accepted. The other threads will have to retry.

from moquette.

JasontieZhu avatar JasontieZhu commented on July 17, 2024

1、However, the synchronous simulation shows that the two threads enter the cleantomb function, and the later thread overwrites the result of the previous thread, and the return result is success.

2、I Just use sleep function to increase the probability of problems, and use two threads to call cleantomb at the same time.

3、The following is the test code:
private static String cleanTomb(INode inode, INode iParent) {
CNode updatedCnode = iParent.mainNode().copy();
updatedCnode.remove(inode);
System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb start iParent:" + iParent.mainNode());
System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb start updatedCnode:" + updatedCnode);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb doing iParent.mainNode:" + iParent.mainNode());
System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb doing updatedCnode:" + updatedCnode);
String result = iParent.compareAndSet(iParent.mainNode(), updatedCnode) ? "OK" : "REPEAT";
System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb result iParent.mainNode:" + iParent.mainNode());
System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb result:" + result);
return result;
}
4、 Assume that the original parent node has four children:users1,user2,user3,user4.
Two threads (thread-0 and thread-1) enter the cleantomb function at the same time. Thread-0 removes users1 and thread-2 removes users2.
the expect result is that the parent node has two children: user3 and user4, but the actual result is that thread-0 is executed last, and the parent node has three nodes: users2 users3 users4

5、the running result is:
Thead: Thread-0 remove: users1
Thead: Thread-1 remove: users2
Thead: Thread-0 users list: users1 users2 users3 users4
Thead: Thread-1 users list: users1 users2 users3 users4
Thead: Thread-0 cleanTomb start iParent:users1 users2 users3 users4
Thead: Thread-1 cleanTomb start iParent:users1 users2 users3 users4
Thead: Thread-0 cleanTomb start updatedCnode:users2 users3 users4
Thead: Thread-1 cleanTomb start updatedCnode:users1 users3 users4
Thead: Thread-1 cleanTomb doing iParent.mainNode:users1 users2 users3 users4
Thead: Thread-0 cleanTomb doing iParent.mainNode:users1 users2 users3 users4
Thead: Thread-1 cleanTomb doing updatedCnode:users1 users3 users4
Thead: Thread-0 cleanTomb doing updatedCnode:users2 users3 users4
Thead: Thread-1 cleanTomb result iParent.mainNode:users2 users3 users4
Thead: Thread-1 cleanTomb result:OK
Thead: Thread-0 cleanTomb result iParent.mainNode:users2 users3 users4
Thead: Thread-0 cleanTomb result:OK

6、the return result of two threads are OK,and not retry again

from moquette.

JasontieZhu avatar JasontieZhu commented on July 17, 2024

1、The cause of the problem is that the copy method is a deep copy function.
Each function that enters the cleantomb function obtains a copy of the deep copy.
If two threads enter the cleantomb function at the same time, although the setting is successful, the last thread that runs the setting result will overwrite the execution result of the previous thread. The TNode of the previous thread is not deleted.

new ArrayList<>(children) is a deepcopy function.

private CNode(Token token, List<INode> children, Set<Subscription> subscriptions) {
    this.token = token; // keep reference, root comparison in directory logic relies on it for now.
    this.subscriptions = new HashSet<>(subscriptions);
    this.children = new ArrayList<>(children);
}

2、for example:、
List aList = Arrays.asList("a", "b", "c");
List bList = new ArrayList<>(aList);
System.out.println("aList:" + aList);
System.out.println("bList:" + bList);
bList.remove("b");
System.out.println("aList remove result:" + aList);
System.out.println("bList remove result:" + bList);
bList.set(0,"vvvv");
System.out.println("aList set result:" + aList);
System.out.println("bList set result:" + bList);

3、the running result is, and aList is not modified:
aList:[a, b, c]
bList:[a, b, c]
aList remove result:[a, b, c]
bList remove result:[a, c]
aList set result:[a, b, c]
bList set result:[vvvv, c]

from moquette.

JasontieZhu avatar JasontieZhu commented on July 17, 2024

Even if return repeat, the following method is executed, and no operation is performed, Action.OK is returned and the TNode node is not deleted.
As time goes on, more and more TNode nodes are not deleted, and the tree has more and more child nodes, reaching more than 7 million.
So excute the inode.mainNode().anyChildrenMatch function directly causes high CPU usage in the 0.16 version.

        final CNode cnode = inode.mainNode();
        if (cnode instanceof TNode) {
            // this inode is a tomb, has no clients and should be cleaned up
            // Because we implemented cleanTomb below, this should be rare, but possible
            // Consider calling cleanTomb here too
            **return Action.OK;**
        }

from moquette.

JasontieZhu avatar JasontieZhu commented on July 17, 2024

Theoretically, this problem is solved in version 0.17. The token of the TNode node is no longer empty. Even if the token node is not deleted, the node will be restarted when the same token node is added next time, and the TNode node will be converted to the CNode node, In this case, the number of TNode nodes will not increase to 7 million, Unless the token nodes are not reused.

hylkevds
62cb2b3

private Action createNodeAndInsertSubscription(Topic topic, INode inode, CNode cnode, Subscription newSubscription) {
final INode newInode = createPathRec(topic, newSubscription);
final CNode updatedCnode;
if (cnode instanceof TNode) {
updatedCnode = new CNode(cnode.getToken());
} else {
updatedCnode = cnode.copy();
}
updatedCnode.add(newInode);
return inode.compareAndSet(cnode, updatedCnode) ? Action.OK : Action.REPEAT;

from moquette.

JasontieZhu avatar JasontieZhu commented on July 17, 2024

Therefore, to avoid this situation, it is recommended that you directly remove the node, but you need to check whether the node exists in the child set.

private Action cleanTomb(INode inode, INode iParent) {
return iParent.mainNode().remove(inode) ? Action.OK : Action.REPEAT;
}

public boolean remove(INode node) {
if (!this.children.contains(node)) {
return true;
}
return this.children.remove(node);

}

from moquette.

hylkevds avatar hylkevds commented on July 17, 2024

Are you testing this on the development version, or on a released version?

private Action cleanTomb(INode inode, INode iParent) {
    CNode updatedCnode = iParent.mainNode().copy();
    updatedCnode.remove(inode);
    return iParent.compareAndSet(iParent.mainNode(), updatedCnode) ? Action.OK : Action.REPEAT;
}

The problem is that iParent.mainNode() is called twice. That is wrong. The first argument of the compareAndSet must be the original node that we expect to replace. But because iParent.mainNode() is called twice, the node we are replacing may have been replaced by another thread in the meantime, and our copy is no longer valid. But we don't notice, because we're not comparing to the node we copied. Instead we're comparing to whatever is the current mainNode.

So the code should be:

private Action cleanTomb(INode inode, INode iParent) {
    CNode origCnode = iParent.mainNode();
    CNode updatedCnode = origCnode.copy();
    updatedCnode.remove(inode);
    return iParent.compareAndSet(origCnode, updatedCnode) ? Action.OK : Action.REPEAT;
}

Can you please test that?

from moquette.

JasontieZhu avatar JasontieZhu commented on July 17, 2024

1、the version 0.16, the link is
https://github.com/moquette-io/moquette/blob/0.16/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java

2、the line is 204:

private Action cleanTomb(INode inode, INode iParent) {
    CNode updatedCnode = iParent.mainNode().copy();
    updatedCnode.remove(inode);
    return iParent.compareAndSet(iParent.mainNode(), updatedCnode) ? Action.OK : Action.REPEAT;
}

3、 you can use the following test code:

import java.util.concurrent.atomic.AtomicInteger;

public class Main {
private static INode root = null;
private static INode app = null;
private static INode users = null;

private static AtomicInteger count = new AtomicInteger();
private static AtomicInteger removeCount = new AtomicInteger();

public static void init() {
    CNode croot = new CNode();
    croot.setToken(new Token("root"));
    root = new INode(croot);
    CNode capp = new CNode();
    capp.setToken(new Token("APP"));
    app = new INode(capp);
    CNode cusers = new CNode();
    cusers.setToken(new Token("users"));
    users = new INode(cusers);
    croot.add(app);
    capp.add(users);
    add();
}

private static String cleanTomb(INode inode, INode iParent) {
    CNode origCnode = iParent.mainNode();
    **CNode updatedCnode = origCnode.copy();**
    updatedCnode.remove(inode);
    System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb start iParent:" + iParent.mainNode());
    System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb start updatedCnode:" + updatedCnode);
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb doing iParent.mainNode:" + iParent.mainNode());
    System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb doing updatedCnode:" + updatedCnode);
    String result = iParent.compareAndSet(iParent.mainNode(), updatedCnode) ? "OK" : "REPEAT";
    System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb result iParent.mainNode:" + iParent.mainNode());
    System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb result:" + result);
    return result;
}

private static INode createLeafNodes(Token token) {
    CNode newLeafCnode = new CNode();
    newLeafCnode.setToken(token);
    return new INode(newLeafCnode);
}

public static void add() {
    int i = 3;
    while (i >= 0) {
        count.incrementAndGet();
        Integer id = count.get();
        users.mainNode().add(createLeafNodes(new Token("users" + id)));
        i--;
    }
    System.out.println("add result list:" + users.mainNode());

}

public static void main(String[] args) {
    init();
    Runnable removeRunnable = () -> {
        removeCount.incrementAndGet();
        Integer id = removeCount.get();
        System.out.println("Thead: " + Thread.currentThread().getName() + " remove: users" + id);
        System.out.println("Thead: " + Thread.currentThread().getName() + " users list: " + users.mainNode());
        INode child = users.mainNode().childOf(new Token("users" + id));
        cleanTomb(child, users);
    };

    Thread t2 = new Thread(removeRunnable);
    Thread t3 = new Thread(removeRunnable);
    t2.start();
    t3.start();
    try {
        t2.join();
        t3.join();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

}

4、the result is only delete one node,not two nodes:
add result list:users1 users2 users3 users4
Thead: Thread-0 remove: users1
Thead: Thread-1 remove: users2
Thead: Thread-0 users list: users1 users2 users3 users4
Thead: Thread-1 users list: users1 users2 users3 users4
Thead: Thread-0 cleanTomb start iParent:users1 users2 users3 users4
Thead: Thread-1 cleanTomb start iParent:users1 users2 users3 users4
Thead: Thread-0 cleanTomb start updatedCnode:users2 users3 users4
Thead: Thread-1 cleanTomb start updatedCnode:users1 users3 users4
Thead: Thread-1 cleanTomb doing iParent.mainNode:users1 users2 users3 users4
Thead: Thread-1 cleanTomb doing updatedCnode:users1 users3 users4
Thead: Thread-0 cleanTomb doing iParent.mainNode:users1 users2 users3 users4
Thead: Thread-0 cleanTomb doing updatedCnode:users2 users3 users4
Thead: Thread-1 cleanTomb result iParent.mainNode:users1 users3 users4
Thead: Thread-1 cleanTomb result:OK
Thead: Thread-0 cleanTomb result iParent.mainNode:users2 users3 users4
Thead: Thread-0 cleanTomb result:OK

from moquette.

JasontieZhu avatar JasontieZhu commented on July 17, 2024

In the multi-thread scenario, this problem is likely to occur.
When the cleantomb function is running at the same time, the same copy is obtained.
In this case, the thread that is set later overwrites the thread that is set earlier.
The more clients are connected and the more clients are unsubscribed at the same time, the higher the concurrency of cleantomb.
so the enviroment has 5 million TNodes, and not be cleaned.

from moquette.

hylkevds avatar hylkevds commented on July 17, 2024

You should really use at least 0.17 for testing. A lot has changed since 0.16...

With the fix I just posted, the result of the compareAndSet should no longer be OK when another thread has replaced the original value. It is important to use compareAndSet, precisely to make sure two threads don't overwrite each others changes.
The two threads may have the same copy when starting the function, but when the first one replaces the content, the compare will fail for the second thread and it will re-try.

from moquette.

JasontieZhu avatar JasontieZhu commented on July 17, 2024

1、 iParent.compareAndSet(iParent.mainNode(), updatedCnode) ,the function is equivalent to iParent.mainNode().compareAndSet(iParent.mainNode(), newNode)
the actual value is iParent.mainNode(), and the expect value is iParent.mainNode(), the actual value is equal to the the expect value,so iParent.compareAndSet(iParent.mainNode(), updatedCnode) is true, and not false

private Action cleanTomb(INode inode, INode iParent) {
CNode updatedCnode = iParent.mainNode().copy();
updatedCnode.remove(inode);
return iParent.compareAndSet(iParent.mainNode(), updatedCnode) ? Action.OK : Action.REPEAT;
}

class INode {
private AtomicReference mainNode = new AtomicReference<>();

INode(CNode mainNode) {
    this.mainNode.set(mainNode);
    if (mainNode instanceof TNode) { // this should never happen
        throw new IllegalStateException("TNode should not be set on mainNnode");
    }
}

**boolean compareAndSet(CNode old, CNode newNode) {
    return mainNode.compareAndSet(old, newNode);
}**

boolean compareAndSet(CNode old, TNode newNode) {
    return mainNode.compareAndSet(old, newNode);
}

CNode mainNode() {
    return this.mainNode.get();
}

boolean isTombed() {
    return this.mainNode() instanceof TNode;
}

}

/**
 * Atomically sets the value to the given updated value
 * if the current value {@code ==} the expected value.
 * @param expect the expected value
 * @param update the new value
 * @return {@code true} if successful. False return indicates that
 * the actual value was not equal to the expected value.
 */
public final boolean compareAndSet(V expect, V update) {
    return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}

from moquette.

hylkevds avatar hylkevds commented on July 17, 2024

1、 iParent.compareAndSet(iParent.mainNode(), updatedCnode) ,the function is equivalent to iParent.mainNode().compareAndSet(iParent.mainNode(), newNode)
the actual value is iParent.mainNode(), and the expect value is iParent.mainNode(), the actual value is equal to the the expect value,so iParent.compareAndSet(iParent.mainNode(), updatedCnode) is true, and not false

Yes, that's why I explained in my previous post why iParent.compareAndSet(iParent.mainNode(), updatedCnode) is wrong. The problem is the second call to iParent.mainNode(). Instead of re-requesting the mainNode, it should pass the instance that was copied, since the mainNode in the iParent may have been changed by another thread in the time it took to make the copy, but if we pass the original value that was copied that conflict will be detected.

from moquette.

JasontieZhu avatar JasontieZhu commented on July 17, 2024

1、The latest version does the same.
the link is
https://github.com/moquette-io/moquette/blob/main/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java

/**
 *
 * Cleans Disposes of TNode in separate Atomic CAS operation per
 * http://bravenewgeek.com/breaking-and-entering-lose-the-lock-while-embracing-concurrency/
 * We roughly follow this theory above, but we allow CNode with no Subscriptions to linger (for now).
 *
 * @param inode inode that handle to the tomb node.
 * @param iParent inode parent.
 * @return REPEAT if this method wasn't successful or OK.
 */
private Action cleanTomb(INode inode, INode iParent) {
    CNode updatedCnode = iParent.mainNode().copy();
    updatedCnode.remove(inode);
    return iParent.compareAndSet(iParent.mainNode(), updatedCnode) ? Action.OK : Action.REPEAT;
}

2、it is recommended that you directly remove the node, and you need to check whether the node exists in the child set, and You can also give up using the the compareAndSet function.
private Action cleanTomb(INode inode, INode iParent) {
return iParent.mainNode().remove(inode) ? Action.OK : Action.REPEAT;
}

public boolean remove(INode node) {
if (!this.children.contains(node)) {
return true;
}
return this.children.remove(node);
}
3、the testing code is
import java.util.concurrent.atomic.AtomicInteger;

public class Main {
private static INode root = null;
private static INode app = null;
private static INode users = null;

private static AtomicInteger count = new AtomicInteger();
private static AtomicInteger removeCount = new AtomicInteger();

public static void init() {
    CNode croot = new CNode();
    croot.setToken(new Token("root"));
    root = new INode(croot);
    CNode capp = new CNode();
    capp.setToken(new Token("APP"));
    app = new INode(capp);
    CNode cusers = new CNode();
    cusers.setToken(new Token("users"));
    users = new INode(cusers);
    croot.add(app);
    capp.add(users);
    add();
}

private static String cleanTomb(INode inode, INode iParent) {
    CNode updatedCnode = iParent.mainNode().copy();

// CNode updatedCnode = origCnode.copy();
// updatedCnode.remove(inode);
System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb start iParent:" + iParent.mainNode());
System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb start updatedCnode:" + updatedCnode);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb doing iParent.mainNode:" + iParent.mainNode());
// System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb doing updatedCnode:" + updatedCnode);
// String result = iParent.compareAndSet(iParent.mainNode(), updatedCnode) ? "OK" : "REPEAT";
String result = iParent.mainNode().remove(inode) ? "OK" : "REPEAT";
System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb result iParent.mainNode:" + iParent.mainNode());
System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb result:" + result);
return result;
}

private static INode createLeafNodes(Token token) {
    CNode newLeafCnode = new CNode();
    newLeafCnode.setToken(token);
    return new INode(newLeafCnode);
}

public static void add() {
    int i = 3;
    while (i >= 0) {
        count.incrementAndGet();
        Integer id = count.get();
        users.mainNode().add(createLeafNodes(new Token("users" + id)));
        i--;
    }
    System.out.println("add result list:" + users.mainNode());

}

public static void main(String[] args) {
    init();
    Runnable removeRunnable = () -> {
        removeCount.incrementAndGet();
        Integer id = removeCount.get();
        System.out.println("Thead: " + Thread.currentThread().getName() + " remove: users" + id);
        System.out.println("Thead: " + Thread.currentThread().getName() + " users list: " + users.mainNode());
        INode child = users.mainNode().childOf(new Token("users" + id));
        cleanTomb(child, users);
    };

    Thread t2 = new Thread(removeRunnable);
    Thread t3 = new Thread(removeRunnable);
    t2.start();
    t3.start();
    try {
        t2.join();
        t3.join();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

}

4、the modified of CNode.java
public boolean remove(INode node) {
// this.children.remove(node);
if (!this.children.contains(node)) {
return true;
}
return this.children.remove(node);
}

5、 the test result is that two nodes has been deleted
add result list:users1 users2 users3 users4
Thead: Thread-0 remove: users1
Thead: Thread-1 remove: users2
Thead: Thread-0 users list: users1 users2 users3 users4
Thead: Thread-1 users list: users1 users2 users3 users4
Thead: Thread-0 cleanTomb start iParent:users1 users2 users3 users4
Thead: Thread-0 cleanTomb start updatedCnode:users1 users2 users3 users4
Thead: Thread-1 cleanTomb start iParent:users1 users2 users3 users4
Thead: Thread-1 cleanTomb start updatedCnode:users1 users2 users3 users4
Thead: Thread-0 cleanTomb doing iParent.mainNode:users1 users2 users3 users4
Thead: Thread-1 cleanTomb doing iParent.mainNode:users1 users2 users3 users4
Thead: Thread-0 cleanTomb result iParent.mainNode:users3 users4
Thead: Thread-0 cleanTomb result:OK
Thead: Thread-1 cleanTomb result iParent.mainNode:users3 users4
Thead: Thread-1 cleanTomb result:OK

from moquette.

hylkevds avatar hylkevds commented on July 17, 2024

1、The latest version does the same.
the link is
https://github.com/moquette-io/moquette/blob/main/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java

Of course, I just posted the update for you to test here in the thread, I didn't make a PR yet, let alone that Andsel had time to review that PR and merge it...

from moquette.

JasontieZhu avatar JasontieZhu commented on July 17, 2024

OK, thank you very much,I'll test it when you're done.

from moquette.

hylkevds avatar hylkevds commented on July 17, 2024

You can test it right now by changing the cleanTomb function to the version I put in #841 (comment)

from moquette.

JasontieZhu avatar JasontieZhu commented on July 17, 2024

1、yes,one of the threads fails to execute, the result is:
add result list:users1 users2 users3 users4
Thead: Thread-0 remove: users1
Thead: Thread-1 remove: users2
Thead: Thread-1 users list: users1 users2 users3 users4
Thead: Thread-0 users list: users1 users2 users3 users4
Thead: Thread-1 cleanTomb start iParent:users1 users2 users3 users4
Thead: Thread-1 cleanTomb start updatedCnode:users1 users3 users4
Thead: Thread-0 cleanTomb start iParent:users1 users2 users3 users4
Thead: Thread-0 cleanTomb start updatedCnode:users2 users3 users4
Thead: Thread-1 cleanTomb doing iParent.mainNode:users1 users2 users3 users4
Thead: Thread-0 cleanTomb doing iParent.mainNode:users1 users2 users3 users4
Thead: Thread-0 cleanTomb doing updatedCnode:users2 users3 users4
Thead: Thread-1 cleanTomb doing updatedCnode:users1 users3 users4
Thead: Thread-0 cleanTomb result iParent.mainNode:users2 users3 users4
Thead: Thread-0 cleanTomb result:OK
Thead: Thread-1 cleanTomb result iParent.mainNode:users2 users3 users4
Thead: Thread-1 cleanTomb result:REPEAT

2、the test code is
import java.util.concurrent.atomic.AtomicInteger;

public class Main {
private static INode root = null;
private static INode app = null;
private static INode users = null;

private static AtomicInteger count = new AtomicInteger();
private static AtomicInteger removeCount = new AtomicInteger();

public static void init() {
    CNode croot = new CNode();
    croot.setToken(new Token("root"));
    root = new INode(croot);
    CNode capp = new CNode();
    capp.setToken(new Token("APP"));
    app = new INode(capp);
    CNode cusers = new CNode();
    cusers.setToken(new Token("users"));
    users = new INode(cusers);
    croot.add(app);
    capp.add(users);
    add();
}

private static String cleanTomb(INode inode, INode iParent) {
    CNode origCnode  = iParent.mainNode();
    CNode updatedCnode = origCnode.copy();
    updatedCnode.remove(inode);
    System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb start iParent:" + iParent.mainNode());
    System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb start updatedCnode:" + updatedCnode);
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb doing iParent.mainNode:" + iParent.mainNode());
    System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb doing updatedCnode:" + updatedCnode);
    String result = iParent.compareAndSet(origCnode, updatedCnode) ? "OK" : "REPEAT";
    System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb result iParent.mainNode:" + iParent.mainNode());
    System.out.println("Thead: " + Thread.currentThread().getName() + " cleanTomb result:" + result);
    return result;
}

private static INode createLeafNodes(Token token) {
    CNode newLeafCnode = new CNode();
    newLeafCnode.setToken(token);
    return new INode(newLeafCnode);
}

public static void add() {
    int i = 3;
    while (i >= 0) {
        count.incrementAndGet();
        Integer id = count.get();
        users.mainNode().add(createLeafNodes(new Token("users" + id)));
        i--;
    }
    System.out.println("add result list:" + users.mainNode());

}

public static void main(String[] args) {
    init();
    Runnable removeRunnable = () -> {
        removeCount.incrementAndGet();
        Integer id = removeCount.get();
        System.out.println("Thead: " + Thread.currentThread().getName() + " remove: users" + id);
        System.out.println("Thead: " + Thread.currentThread().getName() + " users list: " + users.mainNode());
        INode child = users.mainNode().childOf(new Token("users" + id));
        cleanTomb(child, users);
    };

    Thread t2 = new Thread(removeRunnable);
    Thread t3 = new Thread(removeRunnable);
    t2.start();
    t3.start();
    try {
        t2.join();
        t3.join();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

}

3、 but the threads that failed to execute, retry to run the program, the TNode can not be deleted.
because inode.compareAndSet(cnode, tnode) has change the cnode to TNode, and if (cnode instanceof TNode) will directly return Action.OK

public void removeFromTree(Topic topic, String clientID) {
Action res;
do {
res = remove(clientID, topic, this.root, NO_PARENT);
} while (res == Action.REPEAT);
}

private Action remove(String clientId, Topic topic, INode inode, INode iParent) {
    Token token = topic.headToken();
    final CNode cnode = inode.mainNode();
    if (!topic.isEmpty()) {
        Optional<INode> nextInode = cnode.childOf(token);
        if (nextInode.isPresent()) {
            Topic remainingTopic = topic.exceptHeadToken();
            return remove(clientId, remainingTopic, nextInode.get(), inode);
        }
    }
    **if (cnode instanceof TNode) {
        // this inode is a tomb, has no clients and should be cleaned up
        // Because we implemented cleanTomb below, this should be rare, but possible
        // Consider calling cleanTomb here too
        return Action.OK;
    }**
    if (cnode.containsOnly(clientId) && topic.isEmpty() && cnode.allChildren().isEmpty()) {
        // last client to leave this node, AND there are no downstream children, remove via TNode tomb
        if (inode == this.root) {
            return inode.compareAndSet(cnode, inode.mainNode().copy()) ? Action.OK : Action.REPEAT;
        }
        TNode tnode = new TNode(cnode.getToken());
        return **inode.compareAndSet(cnode, tnode)** ? cleanTomb(inode, iParent) : Action.REPEAT;
    } else if (cnode.contains(clientId) && topic.isEmpty()) {
        CNode updatedCnode = cnode.copy();
        updatedCnode.removeSubscriptionsFor(clientId);
        return inode.compareAndSet(cnode, updatedCnode) ? Action.OK : Action.REPEAT;
    } else {
        //someone else already removed
        return Action.OK;
    }

from moquette.

JasontieZhu avatar JasontieZhu commented on July 17, 2024

so it is recommended that change in this way to ensure that the TNode is deleted.
#841 (comment)

from moquette.

hylkevds avatar hylkevds commented on July 17, 2024

so it is recommended that change in this way to ensure that the TNode is deleted. #841 (comment)

That won't work, since ArrayList is not thread-safe and thread-safe List implementations are not lock-free.

When fixing a bug leads us to another bug, we'll just have to fix that one too.

from moquette.

hylkevds avatar hylkevds commented on July 17, 2024

Something else to check: if we remove the last INode from a level, is the parent also cleaned up? We should not be leaving empty nodes hanging around...

from moquette.

JasontieZhu avatar JasontieZhu commented on July 17, 2024

Do all children nodes become TNodes? Only TNodes can be removed.
if the node is not TNode, it will not be removed, so the root node is not automatically removed.

from moquette.

JasontieZhu avatar JasontieZhu commented on July 17, 2024

After thinking about it, only one thread compareAndSet succeeds and the other threads compareAndSet fails.
In this case, if the cleanTomb concurrency is high and only one success is allowed each time, if the cleanTomb concurrency is deleted again during recursion, the cleanTomb concurrency may increase and the recursion may not end.
Therefore, if the cleanTomb fails and the node is found to be a tombstone node, you are advised to perform the cleanup for a maximum of three times. If the cleanup still fails, break the recursion.
It would then be better to have a separate thread that regularly handles the deletion of TNodes.

private Action remove(String clientId, Topic topic, INode inode, INode iParent) {
Token token = topic.headToken();
final CNode cnode = inode.mainNode();
if (!topic.isEmpty()) {
Optional nextInode = cnode.childOf(token);
if (nextInode.isPresent()) {
Topic remainingTopic = topic.exceptHeadToken();
return remove(clientId, remainingTopic, nextInode.get(), inode);
}
}
if (cnode instanceof TNode) {
// this inode is a tomb, has no clients and should be cleaned up
// Because we implemented cleanTomb below, this should be rare, but possible
// Consider calling cleanTomb here too
int i = 3;
while (i > 0) {
if (cleanTomb(inode, iParent) == Action.OK) {
break;
}
;
i--;
}
return Action.OK;

}
if (cnode.containsOnly(clientId) && topic.isEmpty() && cnode.allChildren().isEmpty()) {
// last client to leave this node, AND there are no downstream children, remove via TNode tomb
if (inode == this.root) {
return inode.compareAndSet(cnode, inode.mainNode().copy()) ? Action.OK : Action.REPEAT;
}
TNode tnode = new TNode(cnode.getToken());
return inode.compareAndSet(cnode, tnode) ? cleanTomb(inode, iParent) : Action.REPEAT;
} else if (cnode.contains(clientId) && topic.isEmpty()) {
CNode updatedCnode = cnode.copy();
updatedCnode.removeSubscriptionsFor(clientId);
return inode.compareAndSet(cnode, updatedCnode) ? Action.OK : Action.REPEAT;
} else {
//someone else already removed
return Action.OK;
}

from moquette.

hylkevds avatar hylkevds commented on July 17, 2024

There is a maximum of 1 thread per CPU-core doing subscriptions, so there is no way for any "recursion" to increase.

from moquette.

JasontieZhu avatar JasontieZhu commented on July 17, 2024

The machine is a 24 U physical core, each physical core has four cores, and hyper-threading is 2, that is, 24 x 4 x 2 = 192 threads.

from moquette.

JasontieZhu avatar JasontieZhu commented on July 17, 2024

There are multiple threads processing the parent node at the same time when cleantomb, once this parent node fails to remove child, the recursion will continue.
My tree is root->app->users->(user1, user2, user3, user4 .....),the users node has many childrens,When all threads are processing the parent node users, remove the different nodes of their children. such as thread 0 remove user1, and thread 1 remove user2 ,and so on.
At the same time, just one thread remove successfully,but other threads remove failed,other threads just recursive until successfully.

from moquette.

hylkevds avatar hylkevds commented on July 17, 2024

That sounds like a great setup for gathering some statistics!
Counting the required tries and storing them in little List of AtomicLongs. If it takes two tries, increase the Long at position 1 (don't want to track 0-tries).
Then have a separate thread that outputs the statistics once ever minute or so... That would give us some insight is how big the problem really is.

In 0.17 this problem should be much less than in 0.16, since I sped up the copy procedure a lot and the copy procedure is the slow factor in the parallel process. Faster copies means less chance of a conflict.

Note that they do not recurse, they only repeat.

I guess the worst case in your case happens when many users "log off" and many userX nodes are removed at the same time succession. The root of the issue is the very flat tree you have. We may have to add a method to artificially deepen the tree in your case.

from moquette.

JasontieZhu avatar JasontieZhu commented on July 17, 2024

1、If cleanTomb fails during the running of remove function, repeat is returned, indicating that the recursion continues. During the recursion, the CNode have changed to the TNode. When the CNode is determined as the TNode instance, the following process is performed:

if (cnode instanceof TNode) {
// this inode is a tomb, has no clients and should be cleaned up
// Because we implemented cleanTomb below, this should be rare, but possible
// Consider calling cleanTomb here too
return Action.OK;
}

2、If we do not modify the logic here, OK is returned and the recursion ends, but if we continue cleantomb here, cleantomb fails and the recursion continues until successfully:

if (cnode instanceof TNode) {
// this inode is a tomb, has no clients and should be cleaned up
// Because we implemented cleanTomb below, this should be rare, but possible
// Consider calling cleanTomb here too
int i = 3;
while (i > 0) {
if (cleanTomb(inode, iParent) == Action.OK) {
break;
}
;
i--;
}
return Action.OK;
}

3、The tree is flat. All devices are mounted under the users node. When the connection channel between the MQTT server and devices is inactive, the server automatically unsubscribes the topic,and By default, the connection is disconnected if the connection is inactive within 10 seconds. This is also added to avoid the increasing number of nodes and not automatically deleted.However, once these devices are simultaneously inactive, the cleantomb function will run simultaneously. At this point, only one thread is successfully compareAndSet on the same parent node. If other threads fail to be compareAndSet , the recursion continues until the modification is successful.

void handleConnectionLost() {
String clientID = NettyUtils.clientID(channel);
if (clientID == null || clientID.isEmpty()) {
return;
}
LOG.info("Notifying connection lost event");
if (bindedSession.hasWill()) {
postOffice.fireWill(bindedSession.getWill());
}
if (bindedSession.isClean()) {
LOG.debug("Remove session for client");
sessionRegistry.unsubscribe(bindedSession);
sessionRegistry.remove(bindedSession);
} else {
bindedSession.disconnect();
}
connected = false;
//dispatch connection lost to intercept.
String userName = NettyUtils.userName(channel);
postOffice.dispatchConnectionLost(clientID,userName);
LOG.trace("dispatch disconnection: userName={}", userName);
}

4、There is a low probability that deletion fails. However, once a node fails to be deleted, the number of TNodes increases as time goes by. This is why this problem occurs after the system runs for about two months. More than 5 million TNodes are generated, resulting in high CPU usage.

from moquette.

JasontieZhu avatar JasontieZhu commented on July 17, 2024

There is no way to avoid the flatness of the tree built , so it is better to have a separate thread to delete the TNode node.
In addition, some statistics logs can be added to collect statistics on the number of CompareAndSet failures, which facilitates fault locating.

from moquette.

hylkevds avatar hylkevds commented on July 17, 2024

There is no way to avoid the flatness of the tree built , so it is better to have a separate thread to delete the TNode node. In addition, some statistics logs can be added to collect statistics on the number of CompareAndSet failures, which facilitates fault locating.

It is always possible to internally make a tree deeper, for instance by separating all nodes based on the first character, then by the second, etc. Having a separate thread won't make things more efficient, since that thread still has to content with the other threads creating new nodes.

from moquette.

JasontieZhu avatar JasontieZhu commented on July 17, 2024

If this is the case, if only one thread can delete TNodes and other threads fail to delete TNodes, how should these TNodes be handled or not deleted?
Could you send the latest modification?
The following modification points may not ensure that all TNodes are deleted.
#841 (comment)

from moquette.

hylkevds avatar hylkevds commented on July 17, 2024

My working version with the fixes I made above can be found at: https://github.com/FraunhoferIOSB/moquette/tree/fix_841_cleanTomb_reference_loss

from moquette.

JasontieZhu avatar JasontieZhu commented on July 17, 2024

OK. thank you , does this fix would be merged into the main branch?

from moquette.

hylkevds avatar hylkevds commented on July 17, 2024

It will probably get merged once Andsel has had time to look at it. In the meantime it would help if you could test it to see if the problem is actually fixed.

from moquette.

JasontieZhu avatar JasontieZhu commented on July 17, 2024

OK, thank you,However, the test period is long. This problem occurs occasionally. Therefore, I need to observe the problem for a long time. Capture the profile to check the memory and obtain the number of TNodes. and check whether the number of TNodes increases.

from moquette.

JasontieZhu avatar JasontieZhu commented on July 17, 2024

I found that the fixs has some errors and it hasn't been merged:
Java CI with Maven / Test JDK 11, ubuntu-20.04 (pull_request) Failing after 11m

Error: Errors:
Error: ConnectTest.receiveInflightPublishesAfterAReconnect:101 » Runtime Cannot recei...
Error: MessageExpirationTest.givenPublishedMessageWithExpiryWhenMessageRemainInBrokerForMoreThanTheExipiryIsNotPublished:156 » Runtime

from moquette.

hylkevds avatar hylkevds commented on July 17, 2024

Those build failures are caused by a timeout that is set too strict for the (quite slow) GitHub build systems. That's why it fails only sometimes and not always.
#843 should fix it.

from moquette.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.