Code Monkey home page Code Monkey logo

Comments (26)

WanwanLinLin avatar WanwanLinLin commented on June 26, 2024 2

非常感谢,问题解决了,现在连续查询1000次redis全都有结果,上面是源代码贴给有需要的同学

from workflow.

WanwanLinLin avatar WanwanLinLin commented on June 26, 2024

我参照官方提供的redis demo写了set和get两个接口也有问题,特别是先执行set再执行get会直接阻塞,连续运行get会直接报段错误
void Interface::setex(std::string& key, std::string& time, std::string& val) {
WFRedisTask *task;
redis_input_data data;
data.key = key, data.url = url;
task = WFTaskFactory::create_redis_task(data.url, RETRY_MAX,
redis_callback);
protocol::RedisRequest *req = task->get_req();
task->user_data = &data;
req->set_request("SETEX", { data.key, time, val });
// series_of(task);
task->start();
wait_group.wait();
}

std::string Interface::get(std::string& key) {
    WFRedisTask *task;
    redis_input_data data;
    data.key = key, data.url = url;
    task = WFTaskFactory::create_redis_task(data.url, RETRY_MAX,
										redis_callback);
    protocol::RedisRequest *req = task->get_req();
    task->user_data = &data;
    req->set_request("GET", { key });
    // series_of(task);
    task->start();
    wait_group.wait();
    return data.res;
}

from workflow.

Barenboim avatar Barenboim commented on June 26, 2024

先把代码格式调整一下吧。

from workflow.

WanwanLinLin avatar WanwanLinLin commented on June 26, 2024
#include <stdio.h>
#include <iostream>
#include "workflow/WFTaskFactory.h"
#include "workflow/WFFacilities.h"

void add(int a, int b, int& res)
{
    res = a + b;
}

int main(void)
{
WFFacilities::WaitGroup wait_group(1);
int a = 1;
int b = 1;
int res;

for (int i = 0; i < 10000; i++)
{
    WFGoTask *task = WFTaskFactory::create_go_task("test", add, a, b, std::ref(res));
    WFGoTask *task2 = WFTaskFactory::create_go_task("test", add, a, b, std::ref(res));
    task->set_callback([&](WFGoTask *task) {
        printf("%d + %d = %d\n", a, b, res);
        wait_group.done();
    });

    task2->set_callback([&](WFGoTask *task2) {
        printf("%d + %d = %d\n", a, b, res);
        wait_group.done();
    });

    task->start();
    task2->start();
    wait_group.wait();
    wait_group.wait();

    std::cout << "current i is: " << i << std::endl;
    }

    std::cout << "all task finished!" << std::endl;
    return 0;
}

from workflow.

WanwanLinLin avatar WanwanLinLin commented on June 26, 2024
  void Interface::setex(std::string& key, std::string& time, std::string& val) {
        WFRedisTask *task;
        redis_input_data data;
        data.key = key, data.url = url;
        task = WFTaskFactory::create_redis_task(data.url, RETRY_MAX,
											redis_callback);
        protocol::RedisRequest *req = task->get_req();
        task->user_data = &data;
        req->set_request("SETEX", { data.key, time, val });
        // series_of(task);
        task->start();
        // wait_group.wait();
    }

    std::string Interface::get(std::string& key) {
        WFRedisTask *task;
        redis_input_data data;
        data.key = key, data.url = url;
        task = WFTaskFactory::create_redis_task(data.url, RETRY_MAX,
											redis_callback);
        protocol::RedisRequest *req = task->get_req();
        task->user_data = &data;
        req->set_request("GET", { key });
        // series_of(task);
        task->start();
        wait_group.wait();
        return data.res;
    }

from workflow.

Barenboim avatar Barenboim commented on June 26, 2024
  1. 一般情况下,不要用wait group。
  2. 非要用,你第一段代码wait一次就可以了,初始值为2,不是1。
  3. 一个wait group只用一次。不能反复用同一个。

from workflow.

WanwanLinLin avatar WanwanLinLin commented on June 26, 2024

那我想封装一个Interface类,像上面的代码那样有一个setex和get方法,那这个时候我的wait group该定义在哪里?特别是get接口,我需要redis_callback运行完成后的返回值,这时候我应该要等异步任务运行完获取到结果才能结束掉这个get接口吧?

from workflow.

Barenboim avatar Barenboim commented on June 26, 2024

你放task里不就好了。有user data域可以用啊。

from workflow.

Barenboim avatar Barenboim commented on June 26, 2024

Series里也能放。怎么都不应该是多个任务共享的。

from workflow.

WanwanLinLin avatar WanwanLinLin commented on June 26, 2024

这是我的源代码,现在就卡在这个get接口这里了,麻烦帮我看一下

namespace RedisCli {
	struct redis_input_data
    {
        std::string url;
        std::string key;
        std::string res;
    };

    Interface::Interface(std:: string& url) : url(url) {};

    void redis_callback(WFRedisTask *task){
	protocol::RedisRequest *req = task->get_req();
	protocol::RedisResponse *resp = task->get_resp();
	int state = task->get_state();
	int error = task->get_error();
	protocol::RedisValue val;

	switch (state)
	{
	case WFT_STATE_SYS_ERROR:
		fprintf(stderr, "system error: %s\n", strerror(error));
		break;
	case WFT_STATE_DNS_ERROR:
		fprintf(stderr, "DNS error: %s\n", gai_strerror(error));
		break;
	case WFT_STATE_SSL_ERROR:
		fprintf(stderr, "SSL error: %d\n", error);
		break;
	case WFT_STATE_TASK_ERROR:
		fprintf(stderr, "Task error: %d\n", error);
		break;
	case WFT_STATE_SUCCESS:
		resp->get_result(val);
		if (val.is_error())
		{
		   fprintf(stderr, "Error reply. Need a password?\n");
		   state = WFT_STATE_TASK_ERROR;
		}
		break;
	}

	std::string cmd;
	req->get_command(cmd);
	if (cmd == "SETEX")
	{
		std::cout << "SETEX success..." << std::endl;
	}
	else /* if (cmd == "GET") */
	{
		if (val.is_string())
		{
            redis_input_data *data2 = (redis_input_data *)task->user_data;
            data2->res = val.string_value().c_str();
		}
		else
		{
			fprintf(stderr, "Error: Not a string value. \n");
		}
	}

	// wait_group.done();
}

    void Interface::setex(std::string& key, std::string& time, std::string& val) {
        WFRedisTask *task;
        redis_input_data data;
        data.key = key, data.url = url;
        task = WFTaskFactory::create_redis_task(data.url, RETRY_MAX,
											redis_callback);
        protocol::RedisRequest *req = task->get_req();
        task->user_data = &data;
        req->set_request("SETEX", { data.key, time, val });
        // series_of(task);
        task->start();
        // wait_group.wait();
    }

    std::string Interface::get(std::string& key) {
        WFRedisTask *task;
        redis_input_data data;
        data.key = key, data.url = url;
        task = WFTaskFactory::create_redis_task(data.url, RETRY_MAX,
											redis_callback);
        protocol::RedisRequest *req = task->get_req();
        task->user_data = &data;
        req->set_request("GET", { key });
        // series_of(task);
        task->start();
        // wait_group.wait();
        return data.res;
    }

    Interface::~Interface() {
    }
}

int main() {
	std:: string url = "redis://127.0.0.1:6379/0";
	std:: string key = "a";
	std:: string val = "b";
	std:: string time = "600";
	RedisCli::Interface inter(url);

	for (int i = 0; i < 1000; i++)
	{
		std::string res = inter.get(key);
		std::cout << "\n the res is: " << res << "\n" << std::endl;
		std::cout << "顺利执行到 " << i << " 步" << std::endl;
	}
}

from workflow.

Barenboim avatar Barenboim commented on June 26, 2024

你这个代码不是没有wait吧?你的wait_group也可以直接写在get函数里啊。

from workflow.

WanwanLinLin avatar WanwanLinLin commented on June 26, 2024

wait_group也写在get函数的话,那redis_callback要用到wait_group咋办?

from workflow.

Barenboim avatar Barenboim commented on June 26, 2024

这个不是还在生命周期之内么?没问题的啊。

from workflow.

Barenboim avatar Barenboim commented on June 26, 2024

不过,最好不要到处使用wait_group。例如你上面这个程序,最好是在series里不断的push_back任务,main函数里等一次就可以了。

from workflow.

WanwanLinLin avatar WanwanLinLin commented on June 26, 2024

试了很久都没有办法,有时候查询还会失败,所以最后还是决定使用create_go_task + hiredis 了

from workflow.

Barenboim avatar Barenboim commented on June 26, 2024

我们的redis任务几乎是最容易用的…… 我还没见过用我们的redis没有成功的。

你先不要wait group,callback里把结果打一下再说。

或者你把程序贴完整一点,我直接帮你改了吧。

from workflow.

WanwanLinLin avatar WanwanLinLin commented on June 26, 2024

interface.h

#ifndef REDIS_INTER_H
#define REDIS_INTER_H
#define RETRY_MAX 2

#include <string.h>
#include <iostream>
// #include "_global.h"
#include "workflow/RedisMessage.h"
#include "workflow/WFTaskFactory.h"
#include "workflow/WFFacilities.h"


namespace RedisCli {
    static WFFacilities::WaitGroup wait_group(1);
    struct redis_input_data
    {
        std::string url;
        std::string key;
        std::string res;
    };

    class Interface {
    private:
        std::string url;

    public:
        Interface(std:: string& url);
        ~Interface();
        void static redis_callback(WFRedisTask *task);
        void setex(std::string& key, std::string& time, std::string& val);
        std::string get(std::string& key);
    };
}

#endif

from workflow.

WanwanLinLin avatar WanwanLinLin commented on June 26, 2024

interface.cpp

#include "interface.h"


namespace RedisCli {

    Interface::Interface(std:: string& url) : url(url) {};

    void Interface::redis_callback(WFRedisTask *task){
	protocol::RedisRequest *req = task->get_req();
	protocol::RedisResponse *resp = task->get_resp();
	int state = task->get_state();
	int error = task->get_error();
	protocol::RedisValue val;

	switch (state)
	{
	case WFT_STATE_SYS_ERROR:
		fprintf(stderr, "system error: %s\n", strerror(error));
		break;
	case WFT_STATE_DNS_ERROR:
		fprintf(stderr, "DNS error: %s\n", gai_strerror(error));
		break;
	case WFT_STATE_SSL_ERROR:
		fprintf(stderr, "SSL error: %d\n", error);
		break;
	case WFT_STATE_TASK_ERROR:
		fprintf(stderr, "Task error: %d\n", error);
		break;
	case WFT_STATE_SUCCESS:
		resp->get_result(val);
		if (val.is_error())
		{
		   fprintf(stderr, "Error reply. Need a password?\n");
		   state = WFT_STATE_TASK_ERROR;
		}
		break;
	}

	std::string cmd;
	req->get_command(cmd);
	if (cmd == "SETEX")
	{
		std::cout << "SETEX success..." << std::endl;
	}
	else /* if (cmd == "GET") */
	{
		if (val.is_string())
		{
            redis_input_data *data2 = (redis_input_data *)task->user_data;
            data2->res = val.string_value().c_str();
		}
		else
		{
			fprintf(stderr, "Error: Not a string value. \n");
		}
	}

	wait_group.done();
}

    void Interface::setex(std::string& key, std::string& time, std::string& val) {
        WFRedisTask *task;
        redis_input_data data;
        data.key = key, data.url = url;
        task = WFTaskFactory::create_redis_task(data.url, RETRY_MAX,
											redis_callback);
        protocol::RedisRequest *req = task->get_req();
        task->user_data = &data;
        req->set_request("SETEX", { data.key, time, val });
        // series_of(task);
        task->start();
        // wait_group.wait();
    }

    std::string Interface::get(std::string& key) {
        WFRedisTask *task;
        redis_input_data data;
        data.key = key, data.url = url;
        task = WFTaskFactory::create_redis_task(data.url, RETRY_MAX,
											redis_callback);
        protocol::RedisRequest *req = task->get_req();
        task->user_data = &data;
        req->set_request("GET", { key });
        // series_of(task);
        task->start();
        wait_group.wait();
        return data.res;
    }

    Interface::~Interface() {
    }
}

from workflow.

WanwanLinLin avatar WanwanLinLin commented on June 26, 2024

main.cpp

#include "interface.h"
#include <thread>
#include <chrono>


int main() {
	std:: string url = "redis://127.0.0.1:6379/0";
	std:: string key = "a";
	std:: string val = "b";
	std:: string time = "600";
	RedisCli::Interface inter(url);

	for (int i = 0; i < 1; i++)
	{
		std::string res = inter.get(key);
		std::cout << "\n the res is: " << res << "\n" << std::endl;
		std::cout << "顺利执行到 " << i << " 步" << std::endl;
	}
}

from workflow.

WanwanLinLin avatar WanwanLinLin commented on June 26, 2024

这是我的源代码,麻烦帮忙看一下

from workflow.

Barenboim avatar Barenboim commented on June 26, 2024
#include "interface.h"


namespace RedisCli {

    Interface::Interface(std:: string& url) : url(url) {};

    void Interface::redis_callback(WFRedisTask *task){
	protocol::RedisRequest *req = task->get_req();
	protocol::RedisResponse *resp = task->get_resp();
	int state = task->get_state();
	int error = task->get_error();
	protocol::RedisValue val;

	switch (state)
	{
	case WFT_STATE_SYS_ERROR:
		fprintf(stderr, "system error: %s\n", strerror(error));
		break;
	case WFT_STATE_DNS_ERROR:
		fprintf(stderr, "DNS error: %s\n", gai_strerror(error));
		break;
	case WFT_STATE_SSL_ERROR:
		fprintf(stderr, "SSL error: %d\n", error);
		break;
	case WFT_STATE_TASK_ERROR:
		fprintf(stderr, "Task error: %d\n", error);
		break;
	case WFT_STATE_SUCCESS:
		resp->get_result(val);
		if (val.is_error())
		{
		   fprintf(stderr, "Error reply. Need a password?\n");
		   state = WFT_STATE_TASK_ERROR;
		}
		break;
	}

	std::string cmd;
	req->get_command(cmd);
        redis_input_data *data = (redis_input_data *)task->user_data;

	if (cmd == "SETEX")
	{
		std::cout << "SETEX success..." << std::endl;
	}
	else /* if (cmd == "GET") */
	{
		if (val.is_string())
		{
            data->res = val.string_value().c_str();
		}
		else
		{
			fprintf(stderr, "Error: Not a string value. \n");
		}
	}

	data->wait_group.done();
}

    void Interface::setex(std::string& key, std::string& time, std::string& val) {
        WFRedisTask *task;
        redis_input_data data;
        data.key = key, data.url = url;
        task = WFTaskFactory::create_redis_task(data.url, RETRY_MAX,
											redis_callback);
        protocol::RedisRequest *req = task->get_req();
        task->user_data = &data;
        req->set_request("SETEX", { data.key, time, val });
        // series_of(task);
        task->start();
        data.wait_group.wait();
    }

    std::string Interface::get(std::string& key) {
        WFRedisTask *task;
        redis_input_data data;
        data.key = key, data.url = url;
        task = WFTaskFactory::create_redis_task(data.url, RETRY_MAX,
											redis_callback);
        protocol::RedisRequest *req = task->get_req();
        task->user_data = &data;
        req->set_request("GET", { key });
        // series_of(task);
        task->start();
        data.wait_group.wait();
        return data.res;
    }

    Interface::~Interface() {
    }
}

我没有看到你的struct redis_input_data的定义,你把wait_group放到data里就可以了。很简单的一个事。
麻烦star一下项目吧。

from workflow.

WanwanLinLin avatar WanwanLinLin commented on June 26, 2024

好,我先看一下

from workflow.

WanwanLinLin avatar WanwanLinLin commented on June 26, 2024

我定义了redis的输入结构,但不知道wai_group应该定义在哪里,以及如何再callback使用

struct redis_input_data
{
std::string url;
std::string key;
std::string res;
};

from workflow.

Barenboim avatar Barenboim commented on June 26, 2024

我定义了redis的输入结构,但不知道wai_group应该定义在哪里,以及如何再callback使用

struct redis_input_data { std::string url; std::string key; std::string res; };

struct redis_input_data
{
    std::string url;
    std::string key;
    std::string res;
    WFFacilities::WaitGroup wait_group{1};
};

还不会?

from workflow.

WanwanLinLin avatar WanwanLinLin commented on June 26, 2024

interface.h

#ifndef REDIS_INTER_H
#define REDIS_INTER_H
#define RETRY_MAX 2

#include <string.h>
#include <iostream>
#include "workflow/RedisMessage.h"
#include "workflow/WFTaskFactory.h"
#include "workflow/WFFacilities.h"


namespace RedisCli {
    // static WFFacilities::WaitGroup wait_group(1);
    struct redis_input_data
    {
        std::string url;
        std::string key;
        std::string res;
        WFFacilities::WaitGroup wait_group{1};
    };

    class Interface {
    private:
        std::string url;

    public:
        Interface(std:: string& url);
        ~Interface();
        void static redis_callback(WFRedisTask *task);
        void setex(std::string& key, std::string& time, std::string& val);
        std::string get(std::string& key);
    };
}

#endif

interface.cpp

#ifndef REDIS_INTER_H
#define REDIS_INTER_H
#define RETRY_MAX 2

#include <string.h>
#include <iostream>
#include "workflow/RedisMessage.h"
#include "workflow/WFTaskFactory.h"
#include "workflow/WFFacilities.h"


namespace RedisCli {
    // static WFFacilities::WaitGroup wait_group(1);
    struct redis_input_data
    {
        std::string url;
        std::string key;
        std::string res;
        WFFacilities::WaitGroup wait_group{1};
    };

    class Interface {
    private:
        std::string url;

    public:
        Interface(std:: string& url);
        ~Interface();
        void static redis_callback(WFRedisTask *task);
        void setex(std::string& key, std::string& time, std::string& val);
        std::string get(std::string& key);
    };
}

#endif

main.cpp

#include "interface.h"
#include <thread>
#include <chrono>


int main() {
	std::string url = "redis://127.0.0.1:6390/0";
	std::string key = "aaa-abc";
	std::string val = "b";
	std::string time = "600";
	RedisCli::Interface inter(url);

	for (int i = 0; i < 1000; i++)
	{
		std::string res = inter.get(key);
		std::cout << "\n the res is: " << res << "\n" << std::endl;
		std::cout << "顺利执行到 " << i << " 步" << std::endl;
	}
}

from workflow.

Barenboim avatar Barenboim commented on June 26, 2024

之前你在类里定义一个static变量,肯定不对啊……

好的,有什么问题随时issue吧。

from workflow.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.