Comments (26)
非常感谢,问题解决了,现在连续查询1000次redis全都有结果,上面是源代码贴给有需要的同学
from workflow.
我参照官方提供的redis demo写了set和get两个接口也有问题,特别是先执行set再执行get会直接阻塞,连续运行get会直接报段错误
void Interface::setex(std::string& key, std::string& time, std::string& val) {
WFRedisTask *task;
redis_input_data data;
data.key = key, data.url = url;
task = WFTaskFactory::create_redis_task(data.url, RETRY_MAX,
redis_callback);
protocol::RedisRequest *req = task->get_req();
task->user_data = &data;
req->set_request("SETEX", { data.key, time, val });
// series_of(task);
task->start();
wait_group.wait();
}
std::string Interface::get(std::string& key) {
WFRedisTask *task;
redis_input_data data;
data.key = key, data.url = url;
task = WFTaskFactory::create_redis_task(data.url, RETRY_MAX,
redis_callback);
protocol::RedisRequest *req = task->get_req();
task->user_data = &data;
req->set_request("GET", { key });
// series_of(task);
task->start();
wait_group.wait();
return data.res;
}
from workflow.
先把代码格式调整一下吧。
from workflow.
#include <stdio.h>
#include <iostream>
#include "workflow/WFTaskFactory.h"
#include "workflow/WFFacilities.h"
void add(int a, int b, int& res)
{
res = a + b;
}
int main(void)
{
WFFacilities::WaitGroup wait_group(1);
int a = 1;
int b = 1;
int res;
for (int i = 0; i < 10000; i++)
{
WFGoTask *task = WFTaskFactory::create_go_task("test", add, a, b, std::ref(res));
WFGoTask *task2 = WFTaskFactory::create_go_task("test", add, a, b, std::ref(res));
task->set_callback([&](WFGoTask *task) {
printf("%d + %d = %d\n", a, b, res);
wait_group.done();
});
task2->set_callback([&](WFGoTask *task2) {
printf("%d + %d = %d\n", a, b, res);
wait_group.done();
});
task->start();
task2->start();
wait_group.wait();
wait_group.wait();
std::cout << "current i is: " << i << std::endl;
}
std::cout << "all task finished!" << std::endl;
return 0;
}
from workflow.
void Interface::setex(std::string& key, std::string& time, std::string& val) {
WFRedisTask *task;
redis_input_data data;
data.key = key, data.url = url;
task = WFTaskFactory::create_redis_task(data.url, RETRY_MAX,
redis_callback);
protocol::RedisRequest *req = task->get_req();
task->user_data = &data;
req->set_request("SETEX", { data.key, time, val });
// series_of(task);
task->start();
// wait_group.wait();
}
std::string Interface::get(std::string& key) {
WFRedisTask *task;
redis_input_data data;
data.key = key, data.url = url;
task = WFTaskFactory::create_redis_task(data.url, RETRY_MAX,
redis_callback);
protocol::RedisRequest *req = task->get_req();
task->user_data = &data;
req->set_request("GET", { key });
// series_of(task);
task->start();
wait_group.wait();
return data.res;
}
from workflow.
- 一般情况下,不要用wait group。
- 非要用,你第一段代码wait一次就可以了,初始值为2,不是1。
- 一个wait group只用一次。不能反复用同一个。
from workflow.
那我想封装一个Interface类,像上面的代码那样有一个setex和get方法,那这个时候我的wait group该定义在哪里?特别是get接口,我需要redis_callback运行完成后的返回值,这时候我应该要等异步任务运行完获取到结果才能结束掉这个get接口吧?
from workflow.
你放task里不就好了。有user data域可以用啊。
from workflow.
Series里也能放。怎么都不应该是多个任务共享的。
from workflow.
这是我的源代码,现在就卡在这个get接口这里了,麻烦帮我看一下
namespace RedisCli {
struct redis_input_data
{
std::string url;
std::string key;
std::string res;
};
Interface::Interface(std:: string& url) : url(url) {};
void redis_callback(WFRedisTask *task){
protocol::RedisRequest *req = task->get_req();
protocol::RedisResponse *resp = task->get_resp();
int state = task->get_state();
int error = task->get_error();
protocol::RedisValue val;
switch (state)
{
case WFT_STATE_SYS_ERROR:
fprintf(stderr, "system error: %s\n", strerror(error));
break;
case WFT_STATE_DNS_ERROR:
fprintf(stderr, "DNS error: %s\n", gai_strerror(error));
break;
case WFT_STATE_SSL_ERROR:
fprintf(stderr, "SSL error: %d\n", error);
break;
case WFT_STATE_TASK_ERROR:
fprintf(stderr, "Task error: %d\n", error);
break;
case WFT_STATE_SUCCESS:
resp->get_result(val);
if (val.is_error())
{
fprintf(stderr, "Error reply. Need a password?\n");
state = WFT_STATE_TASK_ERROR;
}
break;
}
std::string cmd;
req->get_command(cmd);
if (cmd == "SETEX")
{
std::cout << "SETEX success..." << std::endl;
}
else /* if (cmd == "GET") */
{
if (val.is_string())
{
redis_input_data *data2 = (redis_input_data *)task->user_data;
data2->res = val.string_value().c_str();
}
else
{
fprintf(stderr, "Error: Not a string value. \n");
}
}
// wait_group.done();
}
void Interface::setex(std::string& key, std::string& time, std::string& val) {
WFRedisTask *task;
redis_input_data data;
data.key = key, data.url = url;
task = WFTaskFactory::create_redis_task(data.url, RETRY_MAX,
redis_callback);
protocol::RedisRequest *req = task->get_req();
task->user_data = &data;
req->set_request("SETEX", { data.key, time, val });
// series_of(task);
task->start();
// wait_group.wait();
}
std::string Interface::get(std::string& key) {
WFRedisTask *task;
redis_input_data data;
data.key = key, data.url = url;
task = WFTaskFactory::create_redis_task(data.url, RETRY_MAX,
redis_callback);
protocol::RedisRequest *req = task->get_req();
task->user_data = &data;
req->set_request("GET", { key });
// series_of(task);
task->start();
// wait_group.wait();
return data.res;
}
Interface::~Interface() {
}
}
int main() {
std:: string url = "redis://127.0.0.1:6379/0";
std:: string key = "a";
std:: string val = "b";
std:: string time = "600";
RedisCli::Interface inter(url);
for (int i = 0; i < 1000; i++)
{
std::string res = inter.get(key);
std::cout << "\n the res is: " << res << "\n" << std::endl;
std::cout << "顺利执行到 " << i << " 步" << std::endl;
}
}
from workflow.
你这个代码不是没有wait吧?你的wait_group也可以直接写在get函数里啊。
from workflow.
wait_group也写在get函数的话,那redis_callback要用到wait_group咋办?
from workflow.
这个不是还在生命周期之内么?没问题的啊。
from workflow.
不过,最好不要到处使用wait_group。例如你上面这个程序,最好是在series里不断的push_back任务,main函数里等一次就可以了。
from workflow.
试了很久都没有办法,有时候查询还会失败,所以最后还是决定使用create_go_task + hiredis 了
from workflow.
我们的redis任务几乎是最容易用的…… 我还没见过用我们的redis没有成功的。
你先不要wait group,callback里把结果打一下再说。
或者你把程序贴完整一点,我直接帮你改了吧。
from workflow.
interface.h
#ifndef REDIS_INTER_H
#define REDIS_INTER_H
#define RETRY_MAX 2
#include <string.h>
#include <iostream>
// #include "_global.h"
#include "workflow/RedisMessage.h"
#include "workflow/WFTaskFactory.h"
#include "workflow/WFFacilities.h"
namespace RedisCli {
static WFFacilities::WaitGroup wait_group(1);
struct redis_input_data
{
std::string url;
std::string key;
std::string res;
};
class Interface {
private:
std::string url;
public:
Interface(std:: string& url);
~Interface();
void static redis_callback(WFRedisTask *task);
void setex(std::string& key, std::string& time, std::string& val);
std::string get(std::string& key);
};
}
#endif
from workflow.
interface.cpp
#include "interface.h"
namespace RedisCli {
Interface::Interface(std:: string& url) : url(url) {};
void Interface::redis_callback(WFRedisTask *task){
protocol::RedisRequest *req = task->get_req();
protocol::RedisResponse *resp = task->get_resp();
int state = task->get_state();
int error = task->get_error();
protocol::RedisValue val;
switch (state)
{
case WFT_STATE_SYS_ERROR:
fprintf(stderr, "system error: %s\n", strerror(error));
break;
case WFT_STATE_DNS_ERROR:
fprintf(stderr, "DNS error: %s\n", gai_strerror(error));
break;
case WFT_STATE_SSL_ERROR:
fprintf(stderr, "SSL error: %d\n", error);
break;
case WFT_STATE_TASK_ERROR:
fprintf(stderr, "Task error: %d\n", error);
break;
case WFT_STATE_SUCCESS:
resp->get_result(val);
if (val.is_error())
{
fprintf(stderr, "Error reply. Need a password?\n");
state = WFT_STATE_TASK_ERROR;
}
break;
}
std::string cmd;
req->get_command(cmd);
if (cmd == "SETEX")
{
std::cout << "SETEX success..." << std::endl;
}
else /* if (cmd == "GET") */
{
if (val.is_string())
{
redis_input_data *data2 = (redis_input_data *)task->user_data;
data2->res = val.string_value().c_str();
}
else
{
fprintf(stderr, "Error: Not a string value. \n");
}
}
wait_group.done();
}
void Interface::setex(std::string& key, std::string& time, std::string& val) {
WFRedisTask *task;
redis_input_data data;
data.key = key, data.url = url;
task = WFTaskFactory::create_redis_task(data.url, RETRY_MAX,
redis_callback);
protocol::RedisRequest *req = task->get_req();
task->user_data = &data;
req->set_request("SETEX", { data.key, time, val });
// series_of(task);
task->start();
// wait_group.wait();
}
std::string Interface::get(std::string& key) {
WFRedisTask *task;
redis_input_data data;
data.key = key, data.url = url;
task = WFTaskFactory::create_redis_task(data.url, RETRY_MAX,
redis_callback);
protocol::RedisRequest *req = task->get_req();
task->user_data = &data;
req->set_request("GET", { key });
// series_of(task);
task->start();
wait_group.wait();
return data.res;
}
Interface::~Interface() {
}
}
from workflow.
main.cpp
#include "interface.h"
#include <thread>
#include <chrono>
int main() {
std:: string url = "redis://127.0.0.1:6379/0";
std:: string key = "a";
std:: string val = "b";
std:: string time = "600";
RedisCli::Interface inter(url);
for (int i = 0; i < 1; i++)
{
std::string res = inter.get(key);
std::cout << "\n the res is: " << res << "\n" << std::endl;
std::cout << "顺利执行到 " << i << " 步" << std::endl;
}
}
from workflow.
这是我的源代码,麻烦帮忙看一下
from workflow.
#include "interface.h"
namespace RedisCli {
Interface::Interface(std:: string& url) : url(url) {};
void Interface::redis_callback(WFRedisTask *task){
protocol::RedisRequest *req = task->get_req();
protocol::RedisResponse *resp = task->get_resp();
int state = task->get_state();
int error = task->get_error();
protocol::RedisValue val;
switch (state)
{
case WFT_STATE_SYS_ERROR:
fprintf(stderr, "system error: %s\n", strerror(error));
break;
case WFT_STATE_DNS_ERROR:
fprintf(stderr, "DNS error: %s\n", gai_strerror(error));
break;
case WFT_STATE_SSL_ERROR:
fprintf(stderr, "SSL error: %d\n", error);
break;
case WFT_STATE_TASK_ERROR:
fprintf(stderr, "Task error: %d\n", error);
break;
case WFT_STATE_SUCCESS:
resp->get_result(val);
if (val.is_error())
{
fprintf(stderr, "Error reply. Need a password?\n");
state = WFT_STATE_TASK_ERROR;
}
break;
}
std::string cmd;
req->get_command(cmd);
redis_input_data *data = (redis_input_data *)task->user_data;
if (cmd == "SETEX")
{
std::cout << "SETEX success..." << std::endl;
}
else /* if (cmd == "GET") */
{
if (val.is_string())
{
data->res = val.string_value().c_str();
}
else
{
fprintf(stderr, "Error: Not a string value. \n");
}
}
data->wait_group.done();
}
void Interface::setex(std::string& key, std::string& time, std::string& val) {
WFRedisTask *task;
redis_input_data data;
data.key = key, data.url = url;
task = WFTaskFactory::create_redis_task(data.url, RETRY_MAX,
redis_callback);
protocol::RedisRequest *req = task->get_req();
task->user_data = &data;
req->set_request("SETEX", { data.key, time, val });
// series_of(task);
task->start();
data.wait_group.wait();
}
std::string Interface::get(std::string& key) {
WFRedisTask *task;
redis_input_data data;
data.key = key, data.url = url;
task = WFTaskFactory::create_redis_task(data.url, RETRY_MAX,
redis_callback);
protocol::RedisRequest *req = task->get_req();
task->user_data = &data;
req->set_request("GET", { key });
// series_of(task);
task->start();
data.wait_group.wait();
return data.res;
}
Interface::~Interface() {
}
}
我没有看到你的struct redis_input_data的定义,你把wait_group放到data里就可以了。很简单的一个事。
麻烦star一下项目吧。
from workflow.
好,我先看一下
from workflow.
我定义了redis的输入结构,但不知道wai_group应该定义在哪里,以及如何再callback使用
struct redis_input_data
{
std::string url;
std::string key;
std::string res;
};
from workflow.
我定义了redis的输入结构,但不知道wai_group应该定义在哪里,以及如何再callback使用
struct redis_input_data { std::string url; std::string key; std::string res; };
struct redis_input_data
{
std::string url;
std::string key;
std::string res;
WFFacilities::WaitGroup wait_group{1};
};
还不会?
from workflow.
interface.h
#ifndef REDIS_INTER_H
#define REDIS_INTER_H
#define RETRY_MAX 2
#include <string.h>
#include <iostream>
#include "workflow/RedisMessage.h"
#include "workflow/WFTaskFactory.h"
#include "workflow/WFFacilities.h"
namespace RedisCli {
// static WFFacilities::WaitGroup wait_group(1);
struct redis_input_data
{
std::string url;
std::string key;
std::string res;
WFFacilities::WaitGroup wait_group{1};
};
class Interface {
private:
std::string url;
public:
Interface(std:: string& url);
~Interface();
void static redis_callback(WFRedisTask *task);
void setex(std::string& key, std::string& time, std::string& val);
std::string get(std::string& key);
};
}
#endif
interface.cpp
#ifndef REDIS_INTER_H
#define REDIS_INTER_H
#define RETRY_MAX 2
#include <string.h>
#include <iostream>
#include "workflow/RedisMessage.h"
#include "workflow/WFTaskFactory.h"
#include "workflow/WFFacilities.h"
namespace RedisCli {
// static WFFacilities::WaitGroup wait_group(1);
struct redis_input_data
{
std::string url;
std::string key;
std::string res;
WFFacilities::WaitGroup wait_group{1};
};
class Interface {
private:
std::string url;
public:
Interface(std:: string& url);
~Interface();
void static redis_callback(WFRedisTask *task);
void setex(std::string& key, std::string& time, std::string& val);
std::string get(std::string& key);
};
}
#endif
main.cpp
#include "interface.h"
#include <thread>
#include <chrono>
int main() {
std::string url = "redis://127.0.0.1:6390/0";
std::string key = "aaa-abc";
std::string val = "b";
std::string time = "600";
RedisCli::Interface inter(url);
for (int i = 0; i < 1000; i++)
{
std::string res = inter.get(key);
std::cout << "\n the res is: " << res << "\n" << std::endl;
std::cout << "顺利执行到 " << i << " 步" << std::endl;
}
}
from workflow.
之前你在类里定义一个static变量,肯定不对啊……
好的,有什么问题随时issue吧。
from workflow.
Related Issues (20)
- workflow充当HTTP网关时数据量太大就收不全 HOT 28
- workflow是否支持RPC协议? HOT 1
- WFNetworkTask http 服务怎样获取 post 请求的数据 HOT 2
- 咨询头文件报错问题 HOT 2
- handler_threads配置的问题 HOT 6
- c++使用wfkafka 调用create_kafka_task崩溃 HOT 38
- 如何优雅的取消DAG中的后续图节点 HOT 2
- myssql cli 使用是否可以创建多个任务 HOT 34
- AcceptEx and GetAcceptExSockaddrs gives error on Windows HOT 2
- Windows与其他平台是单独release吗? 我看11.4没有windows的tag, vcpkg也只有10.9 HOT 2
- 能否给xmake中的workflow加上windows的支持 HOT 6
- redis zadd命令使用,set_request第二个参数是vector<stirng>, 没法添加int类型 score HOT 36
- Turkish Character Support on get_parsed_body HOT 5
- Get All Headers of HTTP Request HOT 5
- http请求获取对方应答的数据接口是哪一个 HOT 1
- 能支持mqtt或zmq吗 HOT 1
- http server服务,使用WFGoTask处理请求结束后线程数没有降低 HOT 7
- 机器的连接达到了上限,如何关闭TIME_WAIT连接。 HOT 2
- 请教一个kafka 11错误码的问题 HOT 5
- workflow 如何支持 fast-dds? HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from workflow.