Comments (24)
你好,请问数据库那边写入都成功了吗?
from workflow.
建议task_callback里输出一些信息,知道任务运行到什么地方了。
from workflow.
void task_callback(WFMySQLTask* task)
{
// step-1. 判断任务状态
if (task->get_state() != WFT_STATE_SUCCESS)
{
fprintf(stderr, "task error message: %s\n",
WFGlobal::get_error_string(task->get_state(),
task->get_error()));
}
}
这是task_callback,没有报错信息。数据库写入了985条
from workflow.
就说是,还剩15条的时候,卡住了?
from workflow.
WFMySQLTask* insertTask = connMysql->create_query_task(l_querySql, task_callback);
使用合并写入的时候也少数据,不知道十几条跑哪去了,我调试只写一次的时候发现的问题,我把l_querySql落到文件放到工具上执行写入的是1000条,这是一个问题。
from workflow.
你COMMIT那条语句,callback里打印一下吧,看看有没有执行到COMMIT。如果没有执行,理论上你数据库里一条也没有啊。
from workflow.
void task_callback(WFMySQLTask* task)
{
// step-1. 判断任务状态
if (task->get_state() != WFT_STATE_SUCCESS)
{
fprintf(stderr, "task error message: %s\n",
WFGlobal::get_error_string(task->get_state(),
task->get_error()));
}
}
void commit_callback(WFMySQLTask* task)
{
// step-1. 判断任务状态
if (task->get_state() != WFT_STATE_SUCCESS)
{
fprintf(stderr, "commit error message: %s\n",
WFGlobal::get_error_string(task->get_state(),
task->get_error()));
}
}
void AxDocStore::push_data(StoreData& data)
{
std::vector<StoreData> moveData;
{
std::lock_guard<std::mutex> lock(m_mutex);
m_memData.emplace_back(std::move(data));
if (999 < m_memData.size()) {
moveData = std::move(m_memData);
}
}
if (!moveData.empty())
{
std::string insertSql = "INSERT INTO request_msg_table (id, pos, sub_id, time_stamp, time_tick, raw_body, raw_len, api_name, parallel_info) VALUES ";
for (const auto& data : moveData) {
insertSql += "(" + std::to_string(data.id) + ", " + std::to_string(data.pos) + ", " + std::to_string(data.sub_id) + ", '" + data.time_stamp + "', " + std::to_string(data.time_tick) + ", '" + data.raw_body + "', " + std::to_string(data.raw_len) + ", '" + data.api_name + "', '" + data.parallel_info + "'), ";
}
insertSql.pop_back(); // 删除最后一个逗号
insertSql.pop_back(); // 删除最后一个空格
insertSql.append(";");
store_record(insertSql);
WFMySQLConnection* connMysql = NULL;
{
std::lock_guard<std::mutex> lock(m_mutex);
auto connItem = m_msqlCon.find(GetCurrentThreadId());
if (connItem == m_msqlCon.end()) {
connMysql = new WFMySQLConnection(GetCurrentThreadId());
if (connMysql) {
connMysql->init(m_mysqlURL);
}
m_msqlCon[GetCurrentThreadId()] = connMysql;
}
else {
connMysql = connItem->second;
}
}
static bool written = false;
if (connMysql) {
std::lock_guard<std::mutex> lock(m_mutex);
if (written) return;
written = true;
// 开始事务
const char* beginQuery = "BEGIN;";
WFMySQLTask* beginTask = connMysql->create_query_task(beginQuery, task_callback);
// 创建任务
auto serieswork = Workflow::create_series_work(beginTask, nullptr);
// 插入数据
WFMySQLTask* insertTask = connMysql->create_query_task(insertSql, task_callback);
serieswork->push_back(insertTask);
//for (const auto& data : moveData) {
// std::string insertQuery = "INSERT INTO request_msg_table (id, pos, sub_id, time_stamp, time_tick, raw_body, raw_len, api_name, parallel_info) VALUES (" + std::to_string(data.id) + ", " + std::to_string(data.pos) + ", " + std::to_string(data.sub_id) + ", '" + data.time_stamp + "', " + std::to_string(data.time_tick) + ", '" + data.raw_body + "', " + std::to_string(data.raw_len) + ", '" + data.api_name + "', '" + data.parallel_info + "');";
// WFMySQLTask* insertTask = connMysql->create_query_task(insertQuery.c_str(), task_callback);
// serieswork->push_back(insertTask);
//}
// 提交事务
const char* commitQuery = "COMMIT;";
WFMySQLTask* commitTask = connMysql->create_query_task(commitQuery, commit_callback);
serieswork->push_back(commitTask);
// 创建WaitGroup
WFFacilities::WaitGroup wait_group(1);
serieswork->set_callback([&wait_group](const SeriesWork*) {
wait_group.done();
});
// 开始SeriesWork
serieswork->start();
// 等待SeriesWork完成
wait_group.wait();
}
}
}
COMMIT的回调是成功的,但是数据库查询没有发现写入数据。
from workflow.
要不你把BEGIN和COMMIT都去了,直接写试一下?
from workflow.
void task_callback(WFMySQLTask* task) { // step-1. 判断任务状态 if (task->get_state() != WFT_STATE_SUCCESS) { fprintf(stderr, "task error message: %s\n", WFGlobal::get_error_string(task->get_state(), task->get_error())); } }这是task_callback,没有报错信息。数据库写入了985条
你不是说写入了985条吗?
from workflow.
我刚才试了一下把BEGIN和COMMIT都去,task_callback没有报错,但是数据库也没有数据。这是代码只执行一次的时候的结果,如果放开一次限制,能写入数据,但是远远小于我实际插入的数据。
from workflow.
所以,程序是能正常结束的,只是数据库里的数据,和你写入的不太一样,少了很多条是吧?
from workflow.
你能不能给出一个最小的失败case。比如,只有一条INSERT语句,执行,然后库里查不出来的。
from workflow.
是的,少了很多条记录。刚才试了一下两条记录是对的:INSERT INTO request_msg_table (id, pos, sub_id, time_stamp, time_tick, raw_body, raw_len, api_name, parallel_info) VALUES (4, 1819, 1, '2023-06-19 00:00:00.041978300', 16871040000419784, '"L0100000!000_CA:2.3,F_OP_ROLE:2,F_OP_USER:9988,F_OP_BRANCH:999,F_SESSION:3k4Ad#@40@25+^p5x6)@3d@3D,F_CHANNEL:6,F_OP_SITE:', 122, 'L0100000', 'L0100000'), (1, 155, 1, '2023-06-19 00:00:00.027058300', 16871040000270584, '"L0300003!000_CA:2.3,F_OP_ROLE:2,F_OP_USER:9988,F_OP_BRANCH:999,F_SESSION:3k4Ad#@40@25+^p5x6)@3d@3D,F_CHANNEL:8,F_OP_SITE:__,MARKET:1', 133, 'L0300003', 'L0300003');
from workflow.
测试代码如下:
static bool written = false;
if (connMysql) {
std::lock_guard<std::mutex> lock(m_mutex);
if (written) return;
written = true;
insertSql = "";
std::ifstream inFile("test.txt");
std::string fileContent;
if (inFile.is_open()) {
std::string line;
while (std::getline(inFile, line)) {
insertSql += line + '\n';
}
inFile.close();
}
else {
fprintf(stderr, "Unable to open file");
}
fprintf(stderr, insertSql.c_str());
// 开始事务
//const char* beginQuery = "BEGIN;";
//WFMySQLTask* beginTask = connMysql->create_query_task(beginQuery, task_callback);
// 插入数据
WFMySQLTask* insertTask = connMysql->create_query_task(insertSql, task_callback);
//serieswork->push_back(insertTask);
//for (const auto& data : moveData) {
// std::string insertQuery = "INSERT INTO request_msg_table (id, pos, sub_id, time_stamp, time_tick, raw_body, raw_len, api_name, parallel_info) VALUES (" + std::to_string(data.id) + ", " + std::to_string(data.pos) + ", " + std::to_string(data.sub_id) + ", '" + data.time_stamp + "', " + std::to_string(data.time_tick) + ", '" + data.raw_body + "', " + std::to_string(data.raw_len) + ", '" + data.api_name + "', '" + data.parallel_info + "');";
// WFMySQLTask* insertTask = connMysql->create_query_task(insertQuery.c_str(), task_callback);
// serieswork->push_back(insertTask);
//}
// 创建任务
auto serieswork = Workflow::create_series_work(insertTask, nullptr);
// 提交事务
//const char* commitQuery = "COMMIT;";
//WFMySQLTask* commitTask = connMysql->create_query_task(commitQuery, commit_callback);
//serieswork->push_back(commitTask);
// 创建WaitGroup
WFFacilities::WaitGroup wait_group(1);
serieswork->set_callback([&wait_group](const SeriesWork*) {
wait_group.done();
});
// 开始SeriesWork
serieswork->start();
// 等待SeriesWork完成
wait_group.wait();
}
当test.txt只放了上述两条记录的时候,数据库可以查到数据,但是到1000条记录的txt的时候,数据库没有数据
from workflow.
另外,当我把999改成99,也就是一次性只写入100条的记录的时候是正常的
from workflow.
批量每次100写入,记录写了100多万记录的时候代码卡死了,但是数据库记录只有59万条
from workflow.
并行堆栈显示卡在了wait_group.wait();调整代码如下:
std::atomic<int64_t> g_wrtotal = 0;
void task_callback(WFMySQLTask* task)
{
// step-1. 判断任务状态
if (task->get_state() != WFT_STATE_SUCCESS)
{
fprintf(stderr, "task error message: %s\n",
WFGlobal::get_error_string(task->get_state(),
task->get_error()));
}
else
{
g_wrtotal += 100;
}
}
void commit_callback(WFMySQLTask* task)
{
// step-1. 判断任务状态
if (task->get_state() != WFT_STATE_SUCCESS)
{
fprintf(stderr, "commit error message: %s\n",
WFGlobal::get_error_string(task->get_state(),
task->get_error()));
}
}
// 16核心开了16条线程写数据,每条线程一个WFMySQLConnection
void AxDocStore::push_data(StoreData& data)
{
std::vector<StoreData> moveData;
{
std::lock_guard<std::mutex> lock(m_mutex);
m_memData.emplace_back(std::move(data));
if (99 < m_memData.size()) {
moveData = std::move(m_memData);
}
}
if (!moveData.empty())
{
std::string insertSql = "INSERT INTO request_msg_table (id, pos, sub_id, time_stamp, time_tick, raw_body, raw_len, api_name, parallel_info) VALUES ";
for (const auto& data : moveData) {
insertSql += "(" + std::to_string(data.id) + ", " + std::to_string(data.pos) + ", " + std::to_string(data.sub_id) + ", '" + data.time_stamp + "', " + std::to_string(data.time_tick) + ", '" + data.raw_body + "', " + std::to_string(data.raw_len) + ", '" + data.api_name + "', '" + data.parallel_info + "'), ";
}
insertSql.pop_back(); // 删除最后一个逗号
insertSql.pop_back(); // 删除最后一个空格
insertSql.append(";");
//store_record(insertSql);
WFMySQLConnection* connMysql = NULL;
{
std::lock_guard<std::mutex> lock(m_mutex);
auto connItem = m_msqlCon.find(GetCurrentThreadId());
if (connItem == m_msqlCon.end()) {
connMysql = new WFMySQLConnection(GetCurrentThreadId());
if (connMysql) {
connMysql->init(m_mysqlURL);
}
m_msqlCon[GetCurrentThreadId()] = connMysql;
}
else {
connMysql = connItem->second;
}
}
if (connMysql) {
// 插入数据
WFMySQLTask* insertTask = connMysql->create_query_task(insertSql, task_callback);
// 创建任务
auto serieswork = Workflow::create_series_work(insertTask, nullptr);
// 创建WaitGroup
WFFacilities::WaitGroup wait_group(1);
serieswork->set_callback([&wait_group](const SeriesWork*) {
wait_group.done();
});
// 开始SeriesWork
serieswork->start();
// 等待SeriesWork完成
wait_group.wait();
}
}
}
from workflow.
看起来你是在Windows下遇到的问题。你那边有Linux或macOS环境可以试一下吗?
from workflow.
linux环境比较麻烦,需要剥离数据库插入这部分代码适配
from workflow.
代码上看不出有什么问题,虽然在计算线程里等待的用法不太好,但和请求卡住没有关系。
我们看了一下你的堆栈卡在Windows的异步调用上,我们这边实在没有环境调试Windows的问题,你那边如果熟悉Windows能不能帮我们看一下,非常感谢!我们对Windows环境的支持一直缺少人力,实在不好意思。
from workflow.
机器是机械硬盘环境,如果不等待一下会导致mysql存储远远低于我push的速度。时间比较紧,先交付任务,我用mysql client sdk的API来处理mysql数据存储了,存储数目不对的问题已经暴露出来了,1000条数据中有几条带特殊字符导致,但是在callback中task->get_state() != WFT_STATE_SUCCESS没有检测出来,我晚些再跟踪一下。
from workflow.
机器是机械硬盘环境,如果不等待一下会导致mysql存储远远低于我push的速度。时间比较紧,先交付任务,我用mysql client sdk的API来处理mysql数据存储了,存储数目不对的问题已经暴露出来了,1000条数据中有几条带特殊字符导致,但是在callback中task->get_state() != WFT_STATE_SUCCESS没有检测出来,我晚些再跟踪一下。
关于特殊字符的问题,我们是可以得到错误的。task->get_state()只是得到任务的状态,这种情况下只要完成一次成功的mysql交互,得到的就是成功。而具体结果的状态,可以看一下result set。
from workflow.
@lrh450330 Hello,请问除了非法字符导致的请求失败,还有别的问题吗?通过对MySQLResult状态的检查,是否能解决问题?
from workflow.
非法字符导致的是请求失败,但是关键的死锁问题没有定位到。
from workflow.
Related Issues (20)
- http客户端发送消息 HOT 9
- 有没有复杂一些的http server例子? HOT 1
- 我看了workflow和wfrest,都写得很优雅,上手很快👍🏻。但是我有个疑问,Web服务器为什么不用go和gin呢,而要用workflow?选择workflow的优势有啥? HOT 2
- websocket并发问题咨询 HOT 2
- 用WFFileIOTask写入字符串到文件里出现乱码 HOT 2
- 在tutorial执行"make"报错"collect2: error: ld returned 1 exit status" HOT 7
- 咨询tutorial-05-http-proxy的reply_callback() HOT 2
- 咨询curl向workflow server上传文件,workflow server如何读取文件内容 HOT 2
- httpServerTask的回调函数中设置响应会回复默认响应 HOT 4
- 关于wait_group的详细用法 HOT 26
- 关于workflow TCP连接获取连接地址以及端口问题 HOT 1
- nossl分支tutorial make报错 HOT 5
- 关于GPL许可证兼容性的疑虑 HOT 1
- 支持PostgreSQL协议的讨论帖 HOT 17
- 关于mysql事务的回滚问题 HOT 3
- workflow充当HTTP网关时数据量太大就收不全 HOT 28
- workflow是否支持RPC协议? HOT 1
- WFNetworkTask http 服务怎样获取 post 请求的数据 HOT 2
- 咨询头文件报错问题 HOT 2
- handler_threads配置的问题 HOT 3
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from workflow.