Code Monkey home page Code Monkey logo

Comments (24)

Barenboim avatar Barenboim commented on June 8, 2024

你好,请问数据库那边写入都成功了吗?

from workflow.

Barenboim avatar Barenboim commented on June 8, 2024

建议task_callback里输出一些信息,知道任务运行到什么地方了。

from workflow.

lrh450330 avatar lrh450330 commented on June 8, 2024
void task_callback(WFMySQLTask* task)
{
	// step-1. 判断任务状态 
	if (task->get_state() != WFT_STATE_SUCCESS)
	{
		fprintf(stderr, "task error message: %s\n",
			WFGlobal::get_error_string(task->get_state(),
				task->get_error()));
	}
}

这是task_callback,没有报错信息。数据库写入了985条

from workflow.

Barenboim avatar Barenboim commented on June 8, 2024

就说是,还剩15条的时候,卡住了?

from workflow.

lrh450330 avatar lrh450330 commented on June 8, 2024

WFMySQLTask* insertTask = connMysql->create_query_task(l_querySql, task_callback);
使用合并写入的时候也少数据,不知道十几条跑哪去了,我调试只写一次的时候发现的问题,我把l_querySql落到文件放到工具上执行写入的是1000条,这是一个问题。

from workflow.

Barenboim avatar Barenboim commented on June 8, 2024

你COMMIT那条语句,callback里打印一下吧,看看有没有执行到COMMIT。如果没有执行,理论上你数据库里一条也没有啊。

from workflow.

lrh450330 avatar lrh450330 commented on June 8, 2024
void task_callback(WFMySQLTask* task)
{
	// step-1. 判断任务状态 
	if (task->get_state() != WFT_STATE_SUCCESS)
	{
		fprintf(stderr, "task error message: %s\n",
			WFGlobal::get_error_string(task->get_state(),
				task->get_error()));
	}
}

void commit_callback(WFMySQLTask* task)
{
	// step-1. 判断任务状态 
	if (task->get_state() != WFT_STATE_SUCCESS)
	{
		fprintf(stderr, "commit error message: %s\n",
			WFGlobal::get_error_string(task->get_state(),
				task->get_error()));
	}
}

void AxDocStore::push_data(StoreData& data)
{
	std::vector<StoreData> moveData;
	{
		std::lock_guard<std::mutex> lock(m_mutex);
		m_memData.emplace_back(std::move(data));

		if (999 < m_memData.size()) {
			moveData = std::move(m_memData);
		}
	}

	if (!moveData.empty())
	{
		std::string insertSql = "INSERT INTO request_msg_table (id, pos, sub_id, time_stamp, time_tick, raw_body, raw_len, api_name, parallel_info) VALUES ";
		for (const auto& data : moveData) {
			insertSql += "(" + std::to_string(data.id) + ", " + std::to_string(data.pos) + ", " + std::to_string(data.sub_id) + ", '" + data.time_stamp + "', " + std::to_string(data.time_tick) + ", '" + data.raw_body + "', " + std::to_string(data.raw_len) + ", '" + data.api_name + "', '" + data.parallel_info + "'), ";
		}
		insertSql.pop_back(); // 删除最后一个逗号
		insertSql.pop_back(); // 删除最后一个空格
		insertSql.append(";");

		store_record(insertSql);

		WFMySQLConnection* connMysql = NULL;
		{
			std::lock_guard<std::mutex> lock(m_mutex);
			auto connItem = m_msqlCon.find(GetCurrentThreadId());
			if (connItem == m_msqlCon.end()) {
				connMysql = new WFMySQLConnection(GetCurrentThreadId());
				if (connMysql) {
					connMysql->init(m_mysqlURL);
				}
				m_msqlCon[GetCurrentThreadId()] = connMysql;
			}
			else {
				connMysql = connItem->second;
			}
		}

		static bool written = false;
		if (connMysql) {
			std::lock_guard<std::mutex> lock(m_mutex);
			if (written) return;
			written = true;

			// 开始事务
			const char* beginQuery = "BEGIN;";
			WFMySQLTask* beginTask = connMysql->create_query_task(beginQuery, task_callback);

			// 创建任务
			auto serieswork = Workflow::create_series_work(beginTask, nullptr);

			// 插入数据
			WFMySQLTask* insertTask = connMysql->create_query_task(insertSql, task_callback);
			serieswork->push_back(insertTask);
			//for (const auto& data : moveData) {
			//	std::string insertQuery = "INSERT INTO request_msg_table (id, pos, sub_id, time_stamp, time_tick, raw_body, raw_len, api_name, parallel_info) VALUES (" + std::to_string(data.id) + ", " + std::to_string(data.pos) + ", " + std::to_string(data.sub_id) + ", '" + data.time_stamp + "', " + std::to_string(data.time_tick) + ", '" + data.raw_body + "', " + std::to_string(data.raw_len) + ", '" + data.api_name + "', '" + data.parallel_info + "');";
			//	WFMySQLTask* insertTask = connMysql->create_query_task(insertQuery.c_str(), task_callback);
			//	serieswork->push_back(insertTask);
			//}

			// 提交事务
			const char* commitQuery = "COMMIT;";
			WFMySQLTask* commitTask = connMysql->create_query_task(commitQuery, commit_callback);
			serieswork->push_back(commitTask);

			// 创建WaitGroup
			WFFacilities::WaitGroup wait_group(1);
			serieswork->set_callback([&wait_group](const SeriesWork*) {
					wait_group.done();
				});

			// 开始SeriesWork
			serieswork->start();

			// 等待SeriesWork完成
			wait_group.wait();
		}
	}
}

COMMIT的回调是成功的,但是数据库查询没有发现写入数据。

from workflow.

Barenboim avatar Barenboim commented on June 8, 2024

要不你把BEGIN和COMMIT都去了,直接写试一下?

from workflow.

Barenboim avatar Barenboim commented on June 8, 2024
void task_callback(WFMySQLTask* task)
{
	// step-1. 判断任务状态 
	if (task->get_state() != WFT_STATE_SUCCESS)
	{
		fprintf(stderr, "task error message: %s\n",
			WFGlobal::get_error_string(task->get_state(),
				task->get_error()));
	}
}

这是task_callback,没有报错信息。数据库写入了985条

你不是说写入了985条吗?

from workflow.

lrh450330 avatar lrh450330 commented on June 8, 2024

我刚才试了一下把BEGIN和COMMIT都去,task_callback没有报错,但是数据库也没有数据。这是代码只执行一次的时候的结果,如果放开一次限制,能写入数据,但是远远小于我实际插入的数据。

from workflow.

Barenboim avatar Barenboim commented on June 8, 2024

所以,程序是能正常结束的,只是数据库里的数据,和你写入的不太一样,少了很多条是吧?

from workflow.

Barenboim avatar Barenboim commented on June 8, 2024

你能不能给出一个最小的失败case。比如,只有一条INSERT语句,执行,然后库里查不出来的。

from workflow.

lrh450330 avatar lrh450330 commented on June 8, 2024

是的,少了很多条记录。刚才试了一下两条记录是对的:INSERT INTO request_msg_table (id, pos, sub_id, time_stamp, time_tick, raw_body, raw_len, api_name, parallel_info) VALUES (4, 1819, 1, '2023-06-19 00:00:00.041978300', 16871040000419784, '"L0100000!000_CA:2.3,F_OP_ROLE:2,F_OP_USER:9988,F_OP_BRANCH:999,F_SESSION:3k4Ad#@40@25+^p5x6)@3d@3D,F_CHANNEL:6,F_OP_SITE:', 122, 'L0100000', 'L0100000'), (1, 155, 1, '2023-06-19 00:00:00.027058300', 16871040000270584, '"L0300003!000_CA:2.3,F_OP_ROLE:2,F_OP_USER:9988,F_OP_BRANCH:999,F_SESSION:3k4Ad#@40@25+^p5x6)@3d@3D,F_CHANNEL:8,F_OP_SITE:__,MARKET:1', 133, 'L0300003', 'L0300003');

from workflow.

lrh450330 avatar lrh450330 commented on June 8, 2024

测试代码如下:

		static bool written = false;
		if (connMysql) {
			std::lock_guard<std::mutex> lock(m_mutex);
			if (written) return;
			written = true;

			insertSql = "";
			std::ifstream inFile("test.txt");
			std::string fileContent;
			if (inFile.is_open()) {
				std::string line;
				while (std::getline(inFile, line)) {
					insertSql += line + '\n';
				}
				inFile.close();
			}
			else {
				fprintf(stderr, "Unable to open file");
			}
			fprintf(stderr, insertSql.c_str());

			// 开始事务
			//const char* beginQuery = "BEGIN;";
			//WFMySQLTask* beginTask = connMysql->create_query_task(beginQuery, task_callback);

			// 插入数据
			WFMySQLTask* insertTask = connMysql->create_query_task(insertSql, task_callback);
			//serieswork->push_back(insertTask);
			//for (const auto& data : moveData) {
			//	std::string insertQuery = "INSERT INTO request_msg_table (id, pos, sub_id, time_stamp, time_tick, raw_body, raw_len, api_name, parallel_info) VALUES (" + std::to_string(data.id) + ", " + std::to_string(data.pos) + ", " + std::to_string(data.sub_id) + ", '" + data.time_stamp + "', " + std::to_string(data.time_tick) + ", '" + data.raw_body + "', " + std::to_string(data.raw_len) + ", '" + data.api_name + "', '" + data.parallel_info + "');";
			//	WFMySQLTask* insertTask = connMysql->create_query_task(insertQuery.c_str(), task_callback);
			//	serieswork->push_back(insertTask);
			//}

			// 创建任务
			auto serieswork = Workflow::create_series_work(insertTask, nullptr);

			// 提交事务
			//const char* commitQuery = "COMMIT;";
			//WFMySQLTask* commitTask = connMysql->create_query_task(commitQuery, commit_callback);
			//serieswork->push_back(commitTask);

			// 创建WaitGroup
			WFFacilities::WaitGroup wait_group(1);
			serieswork->set_callback([&wait_group](const SeriesWork*) {
					wait_group.done();
				});

			// 开始SeriesWork
			serieswork->start();

			// 等待SeriesWork完成
			wait_group.wait();
		}

当test.txt只放了上述两条记录的时候,数据库可以查到数据,但是到1000条记录的txt的时候,数据库没有数据

from workflow.

lrh450330 avatar lrh450330 commented on June 8, 2024

另外,当我把999改成99,也就是一次性只写入100条的记录的时候是正常的

from workflow.

lrh450330 avatar lrh450330 commented on June 8, 2024

clipbord_1708599351506
批量每次100写入,记录写了100多万记录的时候代码卡死了,但是数据库记录只有59万条

from workflow.

lrh450330 avatar lrh450330 commented on June 8, 2024

clipbord_1708599665778
并行堆栈显示卡在了wait_group.wait();调整代码如下:

std::atomic<int64_t> g_wrtotal = 0;

void task_callback(WFMySQLTask* task)
{
	// step-1. 判断任务状态 
	if (task->get_state() != WFT_STATE_SUCCESS)
	{
		fprintf(stderr, "task error message: %s\n",
			WFGlobal::get_error_string(task->get_state(),
				task->get_error()));
	}
	else
	{
		g_wrtotal += 100;
	}
}

void commit_callback(WFMySQLTask* task)
{
	// step-1. 判断任务状态 
	if (task->get_state() != WFT_STATE_SUCCESS)
	{
		fprintf(stderr, "commit error message: %s\n",
			WFGlobal::get_error_string(task->get_state(),
				task->get_error()));
	}
}

// 16核心开了16条线程写数据,每条线程一个WFMySQLConnection
void AxDocStore::push_data(StoreData& data)
{
	std::vector<StoreData> moveData;
	{
		std::lock_guard<std::mutex> lock(m_mutex);
		m_memData.emplace_back(std::move(data));

		if (99 < m_memData.size()) {
			moveData = std::move(m_memData);
		}
	}

	if (!moveData.empty())
	{
		std::string insertSql = "INSERT INTO request_msg_table (id, pos, sub_id, time_stamp, time_tick, raw_body, raw_len, api_name, parallel_info) VALUES ";
		for (const auto& data : moveData) {
			insertSql += "(" + std::to_string(data.id) + ", " + std::to_string(data.pos) + ", " + std::to_string(data.sub_id) + ", '" + data.time_stamp + "', " + std::to_string(data.time_tick) + ", '" + data.raw_body + "', " + std::to_string(data.raw_len) + ", '" + data.api_name + "', '" + data.parallel_info + "'), ";
		}
		insertSql.pop_back(); // 删除最后一个逗号
		insertSql.pop_back(); // 删除最后一个空格
		insertSql.append(";");

		//store_record(insertSql);

		WFMySQLConnection* connMysql = NULL;
		{
			std::lock_guard<std::mutex> lock(m_mutex);
			auto connItem = m_msqlCon.find(GetCurrentThreadId());
			if (connItem == m_msqlCon.end()) {
				connMysql = new WFMySQLConnection(GetCurrentThreadId());
				if (connMysql) {
					connMysql->init(m_mysqlURL);
				}
				m_msqlCon[GetCurrentThreadId()] = connMysql;
			}
			else {
				connMysql = connItem->second;
			}
		}

		if (connMysql) {
			// 插入数据
			WFMySQLTask* insertTask = connMysql->create_query_task(insertSql, task_callback);

			// 创建任务
			auto serieswork = Workflow::create_series_work(insertTask, nullptr);
			// 创建WaitGroup
			WFFacilities::WaitGroup wait_group(1);
			serieswork->set_callback([&wait_group](const SeriesWork*) {
					wait_group.done();
				});

			// 开始SeriesWork
			serieswork->start();
			// 等待SeriesWork完成
			wait_group.wait();
		}
	}
}

from workflow.

Barenboim avatar Barenboim commented on June 8, 2024

看起来你是在Windows下遇到的问题。你那边有Linux或macOS环境可以试一下吗?

from workflow.

lrh450330 avatar lrh450330 commented on June 8, 2024

linux环境比较麻烦,需要剥离数据库插入这部分代码适配

from workflow.

Barenboim avatar Barenboim commented on June 8, 2024

代码上看不出有什么问题,虽然在计算线程里等待的用法不太好,但和请求卡住没有关系。
我们看了一下你的堆栈卡在Windows的异步调用上,我们这边实在没有环境调试Windows的问题,你那边如果熟悉Windows能不能帮我们看一下,非常感谢!我们对Windows环境的支持一直缺少人力,实在不好意思。

from workflow.

lrh450330 avatar lrh450330 commented on June 8, 2024

机器是机械硬盘环境,如果不等待一下会导致mysql存储远远低于我push的速度。时间比较紧,先交付任务,我用mysql client sdk的API来处理mysql数据存储了,存储数目不对的问题已经暴露出来了,1000条数据中有几条带特殊字符导致,但是在callback中task->get_state() != WFT_STATE_SUCCESS没有检测出来,我晚些再跟踪一下。

from workflow.

Barenboim avatar Barenboim commented on June 8, 2024

机器是机械硬盘环境,如果不等待一下会导致mysql存储远远低于我push的速度。时间比较紧,先交付任务,我用mysql client sdk的API来处理mysql数据存储了,存储数目不对的问题已经暴露出来了,1000条数据中有几条带特殊字符导致,但是在callback中task->get_state() != WFT_STATE_SUCCESS没有检测出来,我晚些再跟踪一下。

关于特殊字符的问题,我们是可以得到错误的。task->get_state()只是得到任务的状态,这种情况下只要完成一次成功的mysql交互,得到的就是成功。而具体结果的状态,可以看一下result set。

from workflow.

Barenboim avatar Barenboim commented on June 8, 2024

@lrh450330 Hello,请问除了非法字符导致的请求失败,还有别的问题吗?通过对MySQLResult状态的检查,是否能解决问题?

from workflow.

lrh450330 avatar lrh450330 commented on June 8, 2024

非法字符导致的是请求失败,但是关键的死锁问题没有定位到。

from workflow.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.