Code Monkey home page Code Monkey logo

ghkdqhrbals / spring-chatting-server Goto Github PK

View Code? Open in Web Editor NEW
39.0 2.0 3.0 165.35 MB

In this project, we develop a chat server with automatic distribution, automatic scale in/out, and maintenance functions! For more information, please visit https://ghkdqhrbals.github.io/portfolios/docs/project

License: MIT License

HTML 9.58% Java 77.94% Shell 3.15% Dockerfile 0.69% CSS 8.59% Go 0.05%
kafka spring-boot spring-cloud aws-cloudwatch aws-ecr aws-eks elk-stack github-actions kubernetes load-testing

spring-chatting-server's Introduction

프로젝트는 다양한 기술을 실험적으로 적용하는 것을 목적으로 합니다. 프로젝트의 진행과정을 알고싶으신 분은 블로그 를 참고해주세요. 저는 자동화와 성능향상에 초점을 맞추고 개발하였습니다.

5.2.0 부터는 EKS 마이그레이션이 완료되었습니다. 로컬에서 수행하기 위해서는 4.2.0v 버전으로 태그를 이동하고 Instruction 에 따라 실행해주세요(based on correto-jdk-17).

테스트 및 성능향상 기록은 Issue 에 feature:performance 태그로 기록되어 있습니다. 더 자세한 내용을 보고싶으시다면 성능개선 기록 을 참고하세요!

Kubernetes Pods Architecture

image

Scalability

image

CI/CD(Git Actions multijob)

image

spring-chatting-server's People

Contributors

ghkdqhrbals avatar ghkdqhrbals2 avatar mun-kyeong avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

spring-chatting-server's Issues

Access denied in ECR

Description

In Git Actions I got errors.

Error response from daemon: pull access denied for ***.dkr.ecr.ap-northeast-2.amazonaws.com, repository does not exist or may require 'docker login': denied: requested access to the resource is denied

I assume that this is because of wrong ecr url like #107

get status code 409 by request multiple rest api

While I test my chatting Server with following golang test codes, unintended error occurs which is response with 409 status code.

Here's my test code written in golang.

func worker(finished chan map[int]int, requestURL string, bodyReader *bytes.Reader, client http.Client, transferRatePerSecond int) {

	// 로컬 맵 생성
	m := make(map[int]int)

	for i := 0; i < transferRatePerSecond; i++ {
		// 요청 생성
		req, err := http.NewRequest(http.MethodPost, requestURL, bodyReader)
		if err != nil {
			fmt.Printf("client: could not create request: %s\n", err)
			os.Exit(1)
		}

		// json 헤더 설정
		req.Header.Set("Content-Type", "application/json")

		// 실제 요청 전송 및 반환
		res, _ := client.Do(req)

		// 로컬 맵에 삽입
		m[res.StatusCode] += 1
	}

	// 완료 후 채널로 로컬 맵 전송
	finished <- m
}

func main() {

	argsWithProg := os.Args
	argsWithoutProg := os.Args[1:]
	fmt.Println(argsWithProg)
	fmt.Println(argsWithoutProg)

	// http 전송 url
	requestURL := argsWithoutProg[0]

	// json 파일 경로 ex) user
	// .json 확장자 명은 제외
	jsonReq := argsWithoutProg[1]

	// 실행횟수 설정
	transferRatePerSecond, _ := strconv.Atoi(argsWithoutProg[2])

	// JSON 파일 읽기
	jsonFile, err := os.Open(jsonReq + ".json")
	if err != nil {
		fmt.Println(err)
	}
	defer jsonFile.Close()

	// 클라이언트 설정 및 timeout
	client := http.Client{
		Timeout: 30 * time.Second,
	}

	// json파일을 바이트로 변환
	jsonBody, _ := ioutil.ReadAll(jsonFile)
	bodyReader := bytes.NewReader(jsonBody)

	// 스레드 싱크 맵
	var contexts = sync.Map{}
	finished := make(chan map[int]int)

	// 멀티 스레드 http request
	go worker(finished, requestURL, bodyReader, client, transferRatePerSecond)

	// 스레드 별 채널을 통한 완료 확인 및 스레드의 맵 가져오기
	m := <-finished

	// 스레드 별 완료값 꺼내서 mutex lock해주는 맵 저장
	for k, v := range m {
		result, ok := contexts.Load(k)
		if ok {
			contexts.Store(k, result.(int)+v)
		} else {
			contexts.Store(k, v)
		}
	}

	// 성공한 http request 개수 확인
	contexts.Range(func(k, v interface{}) bool {
		fmt.Println("status: ", k, " count: ", v)
		return true
	})
}

And my user.json file looks like

{
    "userId":"a",
    "userName": "a",
    "email":"[email protected]",
    "userPw":"1234"
}

And here's the console of mine when running testing codes.

gyuminhwangbo@Gyuminui-MacBookPro testing % go run main.go http://localhost:8080/auth/user user 50
[http://localhost:8080/auth/user user 50]
status:  409  count:  1 #<--- unIntended
status:  400  count:  49 #<--- Intended

This is my server log.

auth-server               | 2023-01-16 05:55:17.348  INFO 1 --- [nio-8085-exec-9] chatting.chat.web.UserController         : request URI=/auth/user
auth-server               | 2023-01-16 05:55:17.350 DEBUG 1 --- [nio-8085-exec-9] org.hibernate.SQL                        : 
auth-server               |     select
auth-server               |         user0_.user_id as user_id1_0_0_,
auth-server               |         user0_.email as email2_0_0_,
auth-server               |         user0_.join_date as join_dat3_0_0_,
auth-server               |         user0_.login_date as login_da4_0_0_,
auth-server               |         user0_.logout_date as logout_d5_0_0_,
auth-server               |         user0_.user_name as user_nam6_0_0_,
auth-server               |         user0_.user_pw as user_pw7_0_0_ 
auth-server               |     from
auth-server               |         user_table user0_ 
auth-server               |     where
auth-server               |         user0_.user_id=?
auth-server               | 2023-01-16 05:55:17.351 TRACE 1 --- [nio-8085-exec-9] o.h.type.descriptor.sql.BasicBinder      : binding parameter [1] as [VARCHAR] - [a]
auth-server               | 2023-01-16 05:55:17.355 ERROR 1 --- [nio-8085-exec-9] c.chat.web.error.GlobalExceptionHandler  : handleCustomException throw CustomException : DUPLICATE_RESOURCE

# ------------------------------ UnIntended ------------------------------
nginx                     | 192.168.192.1 - - [16/Jan/2023:05:55:17 +0000] "POST /auth/user HTTP/1.1" 409 162 "-" "Go-http-client/1.1" "-"
# ------------------------------ UnIntended ------------------------------

nginx                     | 192.168.192.1 - - [16/Jan/2023:05:55:17 +0000] "POST /auth/user HTTP/1.1" 400 0 "-" "Go-http-client/1.1" "-"
nginx                     | 192.168.192.1 - - [16/Jan/2023:05:55:17 +0000] "POST /auth/user HTTP/1.1" 400 0 "-" "Go-http-client/1.1" "-"
nginx                     | 192.168.192.1 - - [16/Jan/2023:05:55:17 +0000] "POST /auth/user HTTP/1.1" 400 0 "-" "Go-http-client/1.1" "-"
nginx                     | 192.168.192.1 - - [16/Jan/2023:05:55:17 +0000] "POST /auth/user HTTP/1.1" 400 0 "-" "Go-http-client/1.1" "-"

[Document] KafkaMQ idempotency setting

Description

In Kafka configuration, if idempotency is true and the number of retries is set to 3, will there be no conflicts?

As far as I know, idempotence set unique message ID for new and prevent from resending with same message ID.

When message transfer failed(ACK or SEQ), producer will re-send message with same ACK, SEQ, message ID to kafka. But if idempotence is set to be true, I think the producer will not sending any of messages that have same ACK, SEQ, message ID.

Do I understand this concept right? This is little bit confused :(

stack overflow issue reproduction : If idempotency is true and the number of retries is set to 3, will there be no conflicts?

Module compile error

Description

When add common-dto to front-server-module and build it, Intellij cannot find common-dto module. We can check error as below.

:spring-chatting-front-server:test: Could not resolve project :common-dto.
Required by:
    project :spring-chatting-front-server

Possible solution:
 - Declare repository providing the artifact, see the documentation at https://docs.gradle.org/current/userguide/declaring_repositories.html

[AWS] Still Lack of server resource

Desciprtion

Finally I make CI/CD pipelines. But, EC2's cpu resource gets heavy loads. We need big improvements in our resource management.

image

Absence of io.confluent.connect.jdbc.JdbcSinkConnector

When i was running jdbc connector with docker compose file, I sent request below.

  • REQUEST
    POST http://localhost:8091/connector-plugins

  • RESPONSE

    [
        {
            "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
            "type": "source",
            "version": "7.3.1-ccs"
        },
        {
            "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
            "type": "source",
            "version": "7.3.1-ccs"
        },
        {
            "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
            "type": "source",
            "version": "7.3.1-ccs"
        }
    ]
    

And I found that there is no io.confluent.connect.jdbc.JdbcSinkConnector here.

To connect with Postgres, the process installing jdbcDriver in Kafka should be preceded.

Multithreading in JPA

I have some issues about JPA with multithreading.

I found that default functions ( save, saveAll, find, etc. ) are not compatible with multithreading.

So I change JPA functions to Native Query.

thymeleaf th:actions form error

Description

Want to send Http POST to login, but logics failed

<form action="../friends/friends.html" th:action th:object="${loginForm}" method="post" id="login-form">
    <div th:if="${#fields.hasGlobalErrors()}">
        <p class="field-error" th:each="err : ${#fields.globalErrors()}" th:text="${err}">전체 오류 메시지</p>
    </div>

    <input name="username" type="text" th:field="*{loginId}" th:errorclass="field-error" placeholder="Email or phone number"/>
    <p class="field-error" th:errors="*{loginId}" >오류메세지</p>

    <input name="password" th:field="*{password}" type="password" placeholder="Password"/>
    <p class="field-error" th:errors="*{password}" >오류메세지</p>


    <input type="submit" onclick="location.href='../home.html'" th:onclick='/home' value="Log In"/>

    <a href="#" >Find Kakao Account or Password, or</a>
    <a style="margin-top: 10px" th:href='@{/user/register}' href="../users/addUserFormSingle.html" >Create Account</a>
</form>

Connection refused when test embedded Redis

Description

When I tested redis repository with embedded redis, they return Connection refused

Currently I implement embedded redis configuration and also in gradle

  • EmbeddedRedisConfig.class
@Slf4j //lombok
@Profile("local") // profile local
@Configuration
public class EmbeddedRedisConfig {

    @Value("${spring.redis.port}")
    private int redisPort;

    private RedisServer redisServer;

    public EmbeddedRedisConfig() {
        log.info("Local Redis Server");
    }

    @PostConstruct
    public void redisServer() throws IOException {
        log.info("Local Redis Server Start");
        redisServer = new RedisServer(redisPort);
        redisServer.start();
    }

    @PreDestroy
    public void stopRedis() {
        log.info("Local Redis Server Stop");
        if (redisServer != null) {
            redisServer.stop();
        }
    }
}
  • application.yaml in test
spring:
  profiles:
    active: local
  ...
  redis:
    host: localhost
    port: 6379
  • UserRepositoryJDBCTest.class that extends UnitTest
@DisplayName("User Save Batch Test")
class UserRepositoryJDBCTest extends UnitTest {

    @Nested
    @DisplayName("Save User")
    class insertUserBatch{
        @Test
        @DisplayName("Saving single user")
        void saveUserList(){
            // given
            LocalDateTime now = LocalDateTime.now();
            User user = User.builder()
                    .userId("aa")
                    .userPw("1234")
                    .userName("Hwang")
                    .role("USER_ROLE")
                    .email("[email protected]")
                    .loginDate(now)
                    .logoutDate(now)
                    .joinDate(now)
                    .build();

            // when
            userRepositoryJDBC.saveAll(Arrays.asList(user));

            // then
            User savedUser = userRepository.findById("aa").orElseThrow(RuntimeException::new);
            assertThat(savedUser.getUserPw()).isEqualTo(user.getUserPw());
            assertThat(savedUser.getUserName()).isEqualTo(user.getUserName());
            assertThat(savedUser.getRole()).isEqualTo(user.getRole());
            assertThat(Timestamp.valueOf(savedUser.getLogoutDate())).isEqualTo(Timestamp.valueOf(user.getLogoutDate()));
            assertThat(Timestamp.valueOf(savedUser.getLoginDate())).isEqualTo(Timestamp.valueOf(user.getLoginDate()));
            assertThat(Timestamp.valueOf(savedUser.getJoinDate())).isEqualTo(Timestamp.valueOf(user.getJoinDate()));
        }
    }
}
  • UnitTest.class
@SpringBootTest
@EnableAutoConfiguration(exclude = {KafkaAutoConfiguration.class, RedisAutoConfiguration.class})
public class UnitTest {
    @SpyBean
    protected UserRepositoryJDBC userRepositoryJDBC;
    @SpyBean
    protected UserRepository userRepository;
    @SpyBean
    protected UserTransactionRedisRepository userTransactionRedisRepository;

    @MockBean
    protected KafkaTemplate<String, Object> kafkaProducerTemplate;

    @BeforeEach
    void setup(){
        userRepository.deleteAll();
        userTransactionRedisRepository.deleteAll();
    }
}

Expected behavior

I want to test successfully with embedded redis repository test

Error code

Unable to connect to Redis
org.springframework.data.redis.RedisConnectionFailureException: Unable to connect to Redis
	at app//org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.translateException(LettuceConnectionFactory.java:1602)
	at app//org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.getConnection(LettuceConnectionFactory.java:1533)
	at app//org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$SharedConnection.getNativeConnection(LettuceConnectionFactory.java:1358)
	at app//org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$SharedConnection.getConnection(LettuceConnectionFactory.java:1341)
	at app//org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.getSharedConnection(LettuceConnectionFactory.java:1059)
	at app//org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.getConnection(LettuceConnectionFactory.java:398)
	at app//org.springframework.data.redis.core.RedisConnectionUtils.fetchConnection(RedisConnectionUtils.java:193)
	at app//org.springframework.data.redis.core.RedisConnectionUtils.doGetConnection(RedisConnectionUtils.java:144)
	at app//org.springframework.data.redis.core.RedisConnectionUtils.getConnection(RedisConnectionUtils.java:105)
	at app//org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:393)
	at app//org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:373)
	at app//org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:360)
	at app//org.springframework.data.redis.core.RedisKeyValueAdapter.deleteAllOf(RedisKeyValueAdapter.java:378)
	at app//org.springframework.data.keyvalue.core.KeyValueTemplate.lambda$delete$4(KeyValueTemplate.java:266)
	at app//org.springframework.data.keyvalue.core.KeyValueTemplate.execute(KeyValueTemplate.java:314)
	at app//org.springframework.data.keyvalue.core.KeyValueTemplate.delete(KeyValueTemplate.java:264)
	at app//org.springframework.data.keyvalue.repository.support.SimpleKeyValueRepository.deleteAll(SimpleKeyValueRepository.java:162)
	at [email protected]/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at [email protected]/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at [email protected]/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at [email protected]/java.lang.reflect.Method.invoke(Method.java:568)
	at app//org.springframework.data.repository.core.support.RepositoryMethodInvoker$RepositoryFragmentMethodInvoker.lambda$new$0(RepositoryMethodInvoker.java:288)
	at app//org.springframework.data.repository.core.support.RepositoryMethodInvoker.doInvoke(RepositoryMethodInvoker.java:136)
	at app//org.springframework.data.repository.core.support.RepositoryMethodInvoker.invoke(RepositoryMethodInvoker.java:120)
	at app//org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.invoke(RepositoryComposition.java:516)
	at app//org.springframework.data.repository.core.support.RepositoryComposition.invoke(RepositoryComposition.java:285)
	at app//org.springframework.data.repository.core.support.RepositoryFactorySupport$ImplementationMethodExecutionInterceptor.invoke(RepositoryFactorySupport.java:628)
	at app//org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
	at app//org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.doInvoke(QueryExecutorMethodInterceptor.java:168)
	at app//org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.invoke(QueryExecutorMethodInterceptor.java:143)
	at app//org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
	at app//org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:97)
	at app//org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
	at app//org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:218)
	at app/jdk.proxy3/jdk.proxy3.$Proxy203.deleteAll(Unknown Source)
	at [email protected]/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at [email protected]/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at [email protected]/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at [email protected]/java.lang.reflect.Method.invoke(Method.java:568)
	at app//org.mockito.internal.util.reflection.ReflectionMemberAccessor.invoke(ReflectionMemberAccessor.java:48)
	at app//org.mockito.internal.stubbing.defaultanswers.ForwardsInvocations.answer(ForwardsInvocations.java:48)
	at app//org.mockito.internal.handler.MockHandlerImpl.handle(MockHandlerImpl.java:111)
	at app//org.mockito.internal.handler.NullResultGuardian.handle(NullResultGuardian.java:29)
	at app//org.mockito.internal.handler.InvocationNotifierHandler.handle(InvocationNotifierHandler.java:34)
	at app//org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:82)
	at app//org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:56)
	at app//org.mockito.internal.creation.bytebuddy.MockMethodInterceptor$DispatcherDefaultingToRealMethod.interceptAbstract(MockMethodInterceptor.java:161)
	at app//com.example.shopuserservice.domain.user.redisrepository.UserTransactionRedisRepository$MockitoMock$7tFq7lQy.deleteAll(Unknown Source)
	at app//com.example.shopuserservice.UnitTest.setup(UnitTest.java:42)
	at [email protected]/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at [email protected]/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at [email protected]/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at [email protected]/java.lang.reflect.Method.invoke(Method.java:568)
	at app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
	at app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
	at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
	at app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
	at app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptLifecycleMethod(TimeoutExtension.java:128)
	at app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptBeforeEachMethod(TimeoutExtension.java:78)
	at app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
	at app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
	at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
	at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
	at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
	at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
	at app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
	at app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
	at app//org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeMethodInExtensionContext(ClassBasedTestDescriptor.java:520)
	at app//org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$synthesizeBeforeEachMethodAdapter$23(ClassBasedTestDescriptor.java:505)
	at app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeEachMethods$3(TestMethodTestDescriptor.java:174)
	at app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeMethodsOrCallbacksUntilExceptionOccurs$6(TestMethodTestDescriptor.java:202)
	at app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeMethodsOrCallbacksUntilExceptionOccurs(TestMethodTestDescriptor.java:202)
	at app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeEachMethods(TestMethodTestDescriptor.java:171)
	at app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:134)
	at app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
	at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
	at app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at [email protected]/java.util.ArrayList.forEach(ArrayList.java:1511)
	at app//org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at [email protected]/java.util.ArrayList.forEach(ArrayList.java:1511)
	at app//org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at [email protected]/java.util.ArrayList.forEach(ArrayList.java:1511)
	at app//org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at app//org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
	at app//org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
	at app//org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:99)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:79)
	at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:75)
	at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61)
	at [email protected]/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at [email protected]/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at [email protected]/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at [email protected]/java.lang.reflect.Method.invoke(Method.java:568)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
	at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
	at jdk.proxy1/jdk.proxy1.$Proxy2.stop(Unknown Source)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker$3.run(TestWorker.java:193)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
	at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:133)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:71)
	at app//worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
	at app//worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
Caused by: io.lettuce.core.RedisConnectionException: Unable to connect to localhost/<unresolved>:6379
	at app//io.lettuce.core.RedisConnectionException.create(RedisConnectionException.java:78)
	at app//io.lettuce.core.RedisConnectionException.create(RedisConnectionException.java:56)
	at app//io.lettuce.core.AbstractRedisClient.getConnection(AbstractRedisClient.java:350)
	at app//io.lettuce.core.RedisClient.connect(RedisClient.java:216)
	at app//org.springframework.data.redis.connection.lettuce.StandaloneConnectionProvider.lambda$getConnection$1(StandaloneConnectionProvider.java:111)
	at [email protected]/java.util.Optional.orElseGet(Optional.java:364)
	at app//org.springframework.data.redis.connection.lettuce.StandaloneConnectionProvider.getConnection(StandaloneConnectionProvider.java:111)
	at app//org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.getConnection(LettuceConnectionFactory.java:1531)
	... 144 more
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:6379
Caused by: java.net.ConnectException: Connection refused
	at java.base/sun.nio.ch.Net.pollConnect(Native Method)
	at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:672)
	at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:946)
	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:337)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:776)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:833)

[AWS] CI/CD pipelining all the way through AWS and errors

Description

Broken pipe when running CI/CD with Git workflows. Check Actions https://github.com/ghkdqhrbals/spring-chatting-server/actions/runs/6282696603

  • deploy.yaml
name: Deploy to EC2 with Docker Compose

on:
  push:
    branches:
      - main
  workflow_run:
    workflows: ["Build & Test"]
    types:
      - completed

jobs:
  deploy:
    runs-on: ubuntu-latest
    if: ${{ github.event.workflow_run.conclusion == 'success' }}
    steps:
      - name: Checkout code
        uses: actions/checkout@v2
        with:
          fetch-depth: 0  # get all history so we can checkout any branch
      - name: Get latest tag
        id: latesttag
        run: |
          LATEST_TAG=$(git describe --tags --abbrev=0)
          echo "LATEST_TAG=$LATEST_TAG" >> $GITHUB_ENV 

      # Increment version number(ex) 5.0.1 -> 5.0.2)
      # PR title contains "[patch]" -> 5.0.1 -> 5.0.2
      # PR title contains "[minor]" -> 5.0.1 -> 5.1.0
      # PR title contains "[major]" -> 5.0.1 -> 6.0.0
      - name: Increment version based on commit message
        id: increment_version
        run: |
          current_version=${LATEST_TAG#"v"}
          IFS='.' read -ra version_parts <<< "$current_version"
          
          major=${version_parts[0]}
          minor=${version_parts[1]}
          patch=${version_parts[2]}
          
          pr_title=${{ github.event.pull_request.title }}
          
          if [[ $pr_title == *"[major]"* ]]; then
            major=$(( major + 1 ))
            minor=0
            patch=0
          elif [[ $pr_title == *"[minor]"* ]]; then
            minor=$(( minor + 1 ))
            patch=0
          else
            patch=$(( patch + 1 ))
          fi
          
          new_version="$major.$minor.$patch"
          echo "NEW_VERSION=$new_version" >> $GITHUB_ENV

      - name: Create and push new tag
        run: |
          git config --global user.name 'GitHub Actions'
          git config --global user.email '[email protected]'
          git tag v${NEW_VERSION}
          git push origin v${NEW_VERSION}

      - name: Deploy to EC2
        env:
          PRIVATE_KEY: ${{ secrets.EC2_SSH_PRIVATE_KEY }}
          EC2_URL: ${{ secrets.EC2_URL }}
          NEW_VERSION: ${{ env.NEW_VERSION }}
        run: |
          echo "$PRIVATE_KEY" > temp_key.pem
          chmod 600 temp_key.pem
          ssh -o StrictHostKeyChecking=no -i temp_key.pem ${EC2_URL} << EOF
            cd spring-chatting-server
            git pull origin main
            sh run.sh ${NEW_VERSION}
          EOF
          rm temp_key.pem

Here's CPU Usage of EC2 instance. EC2's CPU hits 100% 😂😂😂😂

image

Assumption

Maybe beacuse of long term having connection with ec2. Since I build multiple gradle projects, docker image download, rebuild it, run 12 containers, etc., all process take so long. And this makes ec2 choose for cutting off their connection.

Is there any solution for maintaining connection with ec2 and git actions?

BeanDefinitionOverrideException

Description

When trying to run user-server BeanDefinitionOverrideException occur.

BeanDefinitionOverrideException in

  • JPA&Redis Repository
  • SpringWebSecurity.

Solution for this issue

by mbhave

spring-projects/spring-boot#26897 (comment) happens because of the explicit @EnableWebfluxSecurity annotation. Without spring.main.web-application-type=reactive, the application is considered to be a spring-mvc application, and the auto-configuration switches on @EnableWebSecurity. The two annotations result in a clash. Setting spring.main.web-application-type=reactive prevents @EnableWebSecurity from being applied which is why it works. Since Spring Boot automatically applies @EnableWebSecurity and @EnableWebFluxSecurity, explicitly adding these is not required. This can be converted to a documentation issue for the spring-hateos limitation as Scott suggested.

spring-projects/spring-boot#26897 (comment)

[Document] CI/CD will be change using Kubernetes

Original CI/CD flows

with AWS-ECR+EC2, docker-compose, git actions, etc.

1

2

  1. The deployment workflow gets the latest tag version
  2. Remove v prefix and separate into major, minor and patch
  3. Update the version matching your PR title and push it to github

So if v5.0.1 is the latest tag and the PR title contains **[patch]**, the operation returns an env with v5.0.2 and pushes it to github. If v5.0.2 already exists, the operation will append the commit hash to the end of the new tag, so it may be equal to v5.0.2-abf2154

  1. Run ./gradlew build --build-cache --parallel -Pversion=${new version} which build the .jar files and appending ${new version} at the end of .jar file name. Also creating Dockerfile building with .jar files

If v5.0.2 is new version tag, gradle read the new version and building **_v5.0.2.jar. After building .jar, gradle scripts create Dockerfile that copy matching .jar file (e.g. COPY /build/libs/shop-user-service-v5.0.2.jar /null/app.jar)

  1. Building all Dockerfile with composing, push all created images to ECR

ECR is quite expansive if you want to create multiple repositories. So I create only one repository, push all images tagged with ${service_name}_${new_version} to identifying them.

  1. Access to EC2 and pulling all images from ECR

When pulling all images, we should get all image name, filtering postfix name. For example, ECR has A-service_v5.0.2, A-service_v5.0.1, B-service_v5.0.2, C-service_v5.0.2. Separate each names with _ delimiter, find index[1] is exactly same as v5.0.2 , using for loop create new tags

  1. Run docker-compose-prod.yaml in background

Since I run 12 docker containers in t2.micro instance, it could be overloaded. I'm considering upgrade EC2 or horizontal scaling or EKS for running multiple servers.

Originally posted by @ghkdqhrbals in #78 (comment)

To explain simple, process will be like this.

  • Push new tag -> Build projects -> Build images -> Push to ECR -> Pull images from ECR in EC2 -> Run containers

Changed CI/CD plan

Same process

Push new tag -> Build projects -> Build images -> Push to ECR.

So ECR will has various tags like A-service_v5.0.1, B-service_v5.0.1, A-service_v5.0.2 etc.

Different process

Applying multiple service.yaml, deploy.yaml, persistance volume, etc. files to EC2 kubernetes master node.

But here comes with the question.

Since we are using kubernetes, we need to specify the exact name and tag of image that we pull from ECR. How can we write the image name and tag inside every deploy.yaml files? Kubernetes API doesn't support regular expression. What we want is that for example, find images with v5.0.1 postfix, pull that images, change to latest tag, write down like image: B-service:latest. The process becomes harder because of image versioning. Since we are using ECR, they automatically versioning our images with imageDigest so there's no need for custom versioning. But not human-readable. It's just hash values. For human-friendly, I set tag with custom versioning.

Anyway how can we write each of deploy.yaml files? Well... Let's find out!

Memory management

Progressing

  • 1. Scheduling un-used sink objects
    • need to create custom object that has sink, created time(for scheduling)
  • 2. Managing multitple sink object with Reactor custom class by multiple methods(e.g. removing, emitting, completing, send error with completion)

[Document] Updated 5.2.0 Version

Updated Log 5.2.0 Version

  1. Add login & register screen in chat front server(feat. kokoa clone). Still lots of remains 😂😂
  2. Combining Chatting-Backend & Front & Database server into docker-compose-only-chat.yaml to run in one command
  3. Set profiles to separate environments in different situations(e.g. in local profiles, we exclude kafka, eureka, config server. you can set this profiles in docker-compose file).
  4. Add bash script remove_dangling_images.sh to automatically remove all dangling images.
  5. Cleaning un-used comments in Chatting Backend Server, also Front.
  6. In Authentication Server, there is too much un-used comments and functions. So we have been continuously removed dirty functions, but still many dirty things left.
스크린샷 2023-09-09 오후 6 25 26 스크린샷 2023-09-09 오후 6 25 40

Pulled images from ECR, but still build images in EC2

Description

Pulled images from ECR, but still build images in EC2.

And I think this is beacuse of wrong matching image names.

All Process looks like below.

  1. docker name with main-service_A-service-5.0.12-af2541 is build in Git Actions.
  2. main-service_A-service-5.0.12-af2541 ignore main-service_ remain A-service-5.0.12-af2541
  3. push to ECR with tag name A-service-5.0.12-af2541
  4. EC2 pull every image that has 5.0.12-af2541 postfix from ECR
  5. split A-service-5.0.12-af2541 into A-service as image name and 5.0.12-af2541 re-tagging as lastest
  6. running docker-compose

In step 5, maybe we should add main-service_ prefix again. Beacuse docker-compose will build with main-service_ prefix. So if there is no main-service_ prefix images, they will ignore the images that we already pulled.

nginx Proxy Issue

I cannot redirect to /chat and /user. Here's my configuration

user  nginx;
worker_processes  1;
error_log  /var/log/nginx/error.log warn;
pid        /var/run/nginx.pid;
events {                     
    worker_connections  1024;
}
http {
    include       /etc/nginx/mime.types;
    default_type  application/octet-stream;
    upstream chat-server {
        server chatting-server-2:8084;
    }
    upstream auth-server {
        server auth-server:8085;
    }
    server {
        listen 80;
        # 추후 server_name 변경예정
        server_name localhost;


        # 채팅서버 backend
        location /chat {
            proxy_pass         http://chat-server;
            proxy_redirect     off;
            proxy_set_header   Host $host;
            proxy_set_header   X-Real-IP $remote_addr; # 클라이언트 요청 ip전송
            proxy_set_header   X-Forwarded-For $proxy_add_x_forwarded_for; # 클라이언트 요청 ip전송
        }
        # 인증서버 backend TODO
        location /user {
            rewrite ^([^.]*[^/])$ $1/ permanent; # tailing slash with every url
            proxy_pass         http://auth-server?${args};
            proxy_redirect     off;
            proxy_set_header   Host $host;
            proxy_set_header   X-Real-IP $remote_addr;
            proxy_set_header   X-Forwarded-For $proxy_add_x_forwarded_for;
        }
    }
    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for"';
    access_log  /var/log/nginx/access.log  main;
                                                
    sendfile        on;                                                                         
    keepalive_timeout  65;                                                                      
    include /etc/nginx/conf.d/*.conf;           
}

When docker compose build, skipping all images

Description

When running docker compose build, they all skip images.

So no docker build images in ubuntu, git actions

Run docker-compose -f docker-compose-prod.yaml build
zookeeper uses an image, skipping
kafka1 uses an image, skipping
kafdrop uses an image, skipping
rabbitmq uses an image, skipping
configuration-server uses an image, skipping
discovery-server uses an image, skipping
user-db uses an image, skipping
user-redis uses an image, skipping
user-server uses an image, skipping
chat-db uses an image, skipping
chatting-server uses an image, skipping
customer-db uses an image, skipping
customer-server uses an image, skipping
api-server uses an image, skipping

PSQLException when user-server DB connection

Description

PSQLException when user-server DB connection

Compose

  user-db:
    container_name: user-db
    image: postgres:12-alpine
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=user
    expose:
      - "5435" # Publishes 5433 to other containers but NOT to host machine
    ports:
      - "5435:5435" # Exposing 5435 to host machine
    volumes:
      - ./backups:/home/backups
    command: -c wal_level=logical -p 5435
    restart: always

application.yaml

spring:
  datasource:
    url: jdbc:postgresql://localhost:5435/user # Maybe this can be the main reason...

Code

org.postgresql.util.PSQLException: The connection attempt failed.
	at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:315) ~[postgresql-42.2.27.jar:42.2.27]
	at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:51) ~[postgresql-42.2.27.jar:42.2.27]
	at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:225) ~[postgresql-42.2.27.jar:42.2.27]

[TOO_MANY_REQUESTS/12/disk usage exceeded flood-stage watermark, index has read-only-allow-delete block]

When run elk stack, bump into this error.

kibana_1         | [2023-01-09T08:51:15.226+00:00][WARN ][plugins.usageCollection.usage-collection.usage-counters-service] ResponseError: index [.kibana_8.5.3_001] blocked by: [TOO_MANY_REQUESTS/12/disk usage exceeded flood-stage watermark, index has read-only-allow-delete block];: cluster_block_exception: [cluster_block_exception] Reason: index [.kibana_8.5.3_001] blocked by: [TOO_MANY_REQUESTS/12/disk usage exceeded flood-stage watermark, index has read-only-allow-delete block];
kibana_1         |     at KibanaTransport.request (/usr/share/kibana/node_modules/@elastic/transport/lib/Transport.js:476:27)
kibana_1         |     at runMicrotasks (<anonymous>)
kibana_1         |     at processTicksAndRejections (node:internal/process/task_queues:96:5)
kibana_1         |     at KibanaTransport.request (/usr/share/kibana/node_modules/@kbn/core-elasticsearch-client-server-internal/target_node/src/create_transport.js:58:16)
kibana_1         |     at ClientTraced.UpdateApi [as update] (/usr/share/kibana/node_modules/@elastic/elasticsearch/lib/api/api/update.js:50:12)
kibana_1         | [2023-01-09T08:51:15.230+00:00][WARN ][plugins.usageCollection.usage-collection.usage-counters-service] ResponseError: index [.kibana_8.5.3_001] blocked by: [TOO_MANY_REQUESTS/12/disk usage exceeded flood-stage watermark, index has read-only-allow-delete block];: cluster_block_exception: [cluster_block_exception] Reason: index [.kibana_8.5.3_001] blocked by: [TOO_MANY_REQUESTS/12/disk usage exceeded flood-stage watermark, index has read-only-allow-delete block];
kibana_1         |     at KibanaTransport.request (/usr/share/kibana/node_modules/@elastic/transport/lib/Transport.js:476:27)
kibana_1         |     at runMicrotasks (<anonymous>)
kibana_1         |     at processTicksAndRejections (node:internal/process/task_queues:96:5)
kibana_1         |     at KibanaTransport.request (/usr/share/kibana/node_modules/@kbn/core-elasticsearch-client-server-internal/target_node/src/create_transport.js:58:16)
kibana_1         |     at ClientTraced.UpdateApi [as update] (/usr/share/kibana/node_modules/@elastic/elasticsearch/lib/api/api/update.js:50:12)
kibana_1         | [2023-01-09T08:51:15.233+00:00][WARN ][plugins.usageCollection.usage-collection.usage-counters-service] ResponseError: index [.kibana_8.5.3_001] blocked by: [TOO_MANY_REQUESTS/12/disk usage exceeded flood-stage watermark, index has read-only-allow-delete block];: cluster_block_exception: [cluster_block_exception] Reason: index [.kibana_8.5.3_001] blocked by: [TOO_MANY_REQUESTS/12/disk usage exceeded flood-stage watermark, index has read-only-allow-delete block];
kibana_1         |     at KibanaTransport.request (/usr/share/kibana/node_modules/@elastic/transport/lib/Transport.js:476:27)
kibana_1         |     at runMicrotasks (<anonymous>)
kibana_1         |     at processTicksAndRejections (node:internal/process/task_queues:96:5)
kibana_1         |     at KibanaTransport.request (/usr/share/kibana/node_modules/@kbn/core-elasticsearch-client-server-internal/target_node/src/create_transport.js:58:16)
kibana_1         |     at ClientTraced.UpdateApi [as update] (/usr/share/kibana/node_modules/@elastic/elasticsearch/lib/api/api/update.js:50:12)
kibana_1         | [2023-01-09T08:51:15.237+00:00][WARN ][plugins.usageCollection.usage-collection.usage-counters-service] ResponseError: index [.kibana_8.5.3_001] blocked by: [TOO_MANY_REQUESTS/12/disk usage exceeded flood-stage watermark, index has read-only-allow-delete block];: cluster_block_exception: [cluster_block_exception] Reason: index [.kibana_8.5.3_001] blocked by: [TOO_MANY_REQUESTS/12/disk usage exceeded flood-stage watermark, index has read-only-allow-delete block];
kibana_1         |     at KibanaTransport.request (/usr/share/kibana/node_modules/@elastic/transport/lib/Transport.js:476:27)
kibana_1         |     at runMicrotasks (<anonymous>)
kibana_1         |     at processTicksAndRejections (node:internal/process/task_queues:96:5)
kibana_1         |     at KibanaTransport.request (/usr/share/kibana/node_modules/@kbn/core-elasticsearch-client-server-internal/target_node/src/create_transport.js:58:16)
kibana_1         |     at ClientTraced.UpdateApi [as update] (/usr/share/kibana/node_modules/@elastic/elasticsearch/lib/api/api/update.js:50:12)

Multiple Topics with Kafka connector

I use JDBC-sink connector for backup my postgres DB. And I have multiple tables and multiple topics.

TABLE

  • user_table
  • friend
  • room
  • participant
  • chatting

TOPICS

  • dbserver5434.public.user_table
  • dbserver5434.public.friend
  • dbserver5434.public.room
  • dbserver5434.public.participant
  • dbserver5434.public.chatting

And I set sink configuration as below.

{
    "name": "sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "task.max" : 5,
        "topics": "dbserver5434.public.user_table,dbserver5434.public.friend,dbserver5434.public.room,dbserver5434.public.participant,dbserver5434.public.chatting",
        
        "connection.url": "jdbc:postgresql://chatting-db-1:5433/chat1",
        "connection.user":"postgres",
        "connection.password":"password",
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "tombstones.on.delete": "true",

        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "true",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "true",
        "transforms": "unwrap,dropPrefix",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "auto.create": "true",
        "auto.evolve":"true",
        "transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.dropPrefix.regex":"dbserver5434(.*)$",
        "transforms.dropPrefix.replacement":"$1",
        "batch.size": "1"
    }
}   

When I search the backup DB, I got this which original db has no friend_seq

image

is there any way to sink from multiple topics into multiple tables?

Golang Test with segmentation violation

I run test code below and got segmentation violation error in m[res.StatusCode] += 1.

...
type User struct {
	UserId   string `json:"userId"`
	UserName string `json:"userName"`
	Email    string `json:"email"`
	UserPw   string `json:"userPw"`
}
type Usera struct {
	Name string
}

func init() {
	rand.Seed(time.Now().UnixNano())
}

var letterRunes = []rune("김이박최정강조윤장임한오서신권황안송류전홍고문양손배조백허유남심노정하곽성차주우구신임나전민유진지엄채원천방공강현함변염양변여추노도소신석선설마길주연방위표명기반왕금옥육인맹제모장남탁국여진어은편구용")
var letterRunes_name = []rune("가강건경고관광구규근기길나남노누다단달담대덕도동두라래로루리마만명무문미민바박백범별병보빛사산상새서석선설섭성세소솔수숙순숭슬승시신아안애엄여연영예오옥완요용우원월위유윤율으은의이익인일잎자잔장재전정제조종주준중지진찬창채천철초춘충치탐태택판하한해혁현형혜호홍화환회효훈휘희운모배부림봉혼황량린을비솜공면탁온디항후려균묵송욱휴언령섬들견추걸삼열웅분변양출타흥겸곤번식란더손술훔반빈실직흠흔악람뜸권복심헌엽학개롱평늘늬랑얀향울련")

const letterBytes_id = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

func RandStringBytes(n int, letterBytes string) string {
	b := make([]byte, n)
	for i := range b {
		b[i] = letterBytes[rand.Intn(len(letterBytes))]
	}
	return string(b)
}

func RandStringRunes_firstname(n int) string {
	b := make([]rune, n)
	for i := range b {
		b[i] = letterRunes[rand.Intn(len(letterRunes))]
	}
	return string(b)
}

func RandStringRunes_lastname(n int) string {
	b := make([]rune, n)
	for i := range b {
		b[i] = letterRunes_name[rand.Intn(len(letterRunes_name))]
	}
	return string(b)
}

func worker(contexts *sync.Map, wg *sync.WaitGroup, requestURL string, jsonBody []byte, client *http.Client, transferRatePerSecond int, number_worker int) {
	defer wg.Done()
	fmt.Println("Threads start:", number_worker)
	// 로컬 맵 생성
	m := make(map[int]int)

	for i := 0; i < transferRatePerSecond; i++ {
		s := &User{
			UserId:   RandStringBytes(8, letterBytes_id),
			UserName: RandStringRunes_lastname(1) + RandStringRunes_firstname(2),
			Email:    RandStringBytes(5, letterBytes_id) + "@gmail.com",
			UserPw:   RandStringBytes(10, letterBytes_id),
		}
		buf, err := json.Marshal(s)
		if err != nil {
			log.Fatal(err)
			return
		}
		bodyReader := bytes.NewReader(buf)
		// 요청 생성
		req, err := http.NewRequest(http.MethodPost, requestURL, bodyReader)
		if err != nil {
			os.Exit(1)
		}

		// json 헤더 설정
		req.Header.Set("Content-Type", "application/json")

		// 실제 요청 전송 및 반환
		res, _ := client.Do(req)

		// 로컬 맵에 삽입
		m[res.StatusCode] += 1
	}

	for k, v := range m {
		result, ok := contexts.Load(k)
		if ok {
			contexts.Store(k, result.(int)+v)
		} else {
			contexts.Store(k, v)
		}
	}

}

func main() {

	argsWithoutProg := os.Args[1:]
	var wg sync.WaitGroup

	number_worker := 100
	// http 전송 url
	requestURL := argsWithoutProg[0]

	// json 파일 경로 ex) user
	// .json 확장자 명은 제외
	jsonReq := argsWithoutProg[1]

	// 실행횟수 설정
	transferRatePerSecond, _ := strconv.Atoi(argsWithoutProg[2])

	// JSON 파일 읽기
	jsonFile, err := os.Open(jsonReq + ".json")
	if err != nil {
		fmt.Println(err)
	}
	defer jsonFile.Close()

	t := http.DefaultTransport.(*http.Transport).Clone()
	t.MaxIdleConns = 10000    // connection pool 크기
	t.MaxConnsPerHost = 10000 // 호스트 별 최대 할당 connection
	t.MaxIdleConnsPerHost = 1

	// 클라이언트 설정 및 timeout
	client := &http.Client{
		Timeout:   1 * time.Second,
		Transport: t,
	}

	// json파일을 바이트로 변환
	jsonBody, _ := ioutil.ReadAll(jsonFile)

	// 스레드 싱크 맵
	var contexts = &sync.Map{}

	// 멀티 스레드 http request
	for i := 0; i < number_worker; i++ {
		wg.Add(1)
		go worker(contexts, &wg, requestURL, jsonBody, client, transferRatePerSecond/number_worker, i)
	}

	wg.Wait()

	// 성공한 http request 개수 확인
	contexts.Range(func(k, v interface{}) bool {
		fmt.Println("status: ", k, " count: ", v)
		return true
	})
}

org.apache.kafka.connect.errors.ConnectException

initial setup

POST http://localhost:8091/connectors
{
    "name": "chatting-5433-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "task.max" : 1,
        "topics": "dbserver5434.public.user_table",
        
        "connection.url": "jdbc:postgresql://chatting-db-1:5433/chat",
        "connection.user":"postgres",
        "connection.password":"password",

        "insert.mode":"upsert",

        "table.name.format":"chat.user_table",

        "batch.size": "1"
    }
}   

And when data was passed db from kafka, error occured

org.apache.kafka.connect.errors.ConnectException: Sink connector 'chatting-5433-connector' is configured with 'delete.enabled=false' and 'pk.mode=none' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='dbserver5434.public.user_table',partition=0,offset=0,timestamp=1672986550637) with a HashMap value and null value schema

Failed Test in comparing LocalDateTime

Description

When comparing LocalDateTime in Ubuntu(Git Actions), localDateTime.now() returns with nano seconds(But mac OS doesn't).

So we need to cut nano seconds with truncatedTo(MICRO) method.

Error Code

image

[Document] Shell script for removing dangling docker images

Desciprtion

To remove all dangling images, I create bash script. So when you build duplicated images inside your ssh server or ec2, you can easily remove all un-used duplicated images.

image

which can be found in root/remove_dangling_image.sh

How can we grouping Kafka Consumers in one group and assign multiple topics

Consumers with the same group name subscribed to different topics will NOT consume messages from other topics, because the key is (topic, group)

So, we should consider that how kafka group consumers and binding them to topics!

Reference : https://stackoverflow.com/questions/57753211/kafka-use-common-consumer-group-to-access-multiple-topics

I'm surprised that all answers with "yes" are wrong. I just tested it and having the same group.id for consumers for different topic works well and does NOT mean that they share messages, because for Kafka the key is (topic, group) rather than just (group). Here is what I did:

  1. created 2 different topics T1 and T2 with 2 partitions in each topic
  2. created 2 consumers with the same group xxx
  3. assigned consumer C1 to T1, consumer C2 to T2
  4. produced messages to T1 - only consumer C1 assigned to T1 processed them
  5. produced messages to T2 - only consumer C2 assigned to T2 processed them
  6. killed consumer C1 and repeated 4-5 steps. Only consumer C2 processed messages from T2
  7. messages from T1 were not processed

Conclusion: Consumers with the same group name subscribed to different topics will NOT consume messages from other topics, because the key is (topic, group)

To see more info, check https://ghkdqhrbals.github.io/portfolios/docs/메세지큐/2022-12-02-kafka/

Postman cannot send HTTP request

Solved

Since postman cannot send HTTP request with json body, I fix my DTO as below.

  • Before
@Getter
@Setter
public class RequestAddFriendDTO {
    private String userId;
    private List<String> friendId;
    public RequestAddFriendDTO(String userId){
        this.userId = userId;
    }
}
  • After
@Getter
@Setter
public class RequestAddFriendDTO {
    private String userId;
    private List<String> friendId;
}

I dont know why but constructor in DTO makes error

JsonParseException: Unexpected character ('}' (code 125))

I get List<Friend> from backend server using webClient. And when im trying to parse Flux struct to List, I failed with following error messsage.

com.fasterxml.jackson.core.JsonParseException: Unexpected character ('}' (code 125)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: UNKNOWN; line: 1, column: 68453]
...

this is my code.

Flux<Friend> response = webClient.mutate()
                    .build()
                    .get()
                    .uri("/chat/friend?userId=" + loginUser.getUserId())
                    .retrieve()
                    .onStatus(
                            HttpStatus.NOT_FOUND::equals,
                            r -> r.bodyToMono(ErrorResponse.class).map(e -> new CustomThrowableException(e)))
                    .onStatus(
                            HttpStatus.UNAUTHORIZED::equals,
                            r -> r.bodyToMono(ErrorResponse.class).map(e -> new CustomThrowableException(e)))
                    .bodyToFlux(Friend.class);

List<Friend> readers = response.collect(Collectors.toList()) << ERROR!
                    .share().block();

Api-gateway ServletWebServerFactory not found exception

Description

When I run docker-compose-dev.yaml, ServletWebServerFactory exception occured.

Web application could not be started as there was no org.springframework.boot.web.servlet.server.ServletWebServerFactory bean defined in the context.
2023-08-06T07:21:38.680313600Z 

docker composing with rebuild image + restart container + remove dangling image

Rebuild image and run as container

docker-compose -f docker-compose-prod.yaml up -d --no-deps --build <<SERVICE_NAME>>

View dangling images

docker images -f "dangling=true" -q

REPOSITORY                                    TAG                   IMAGE ID       CREATED          SIZE
spring-chatting-server_api-server             latest                6b4ea0140314   18 seconds ago   384MB
<none>                                        <none>                cdb270f9d9be   6 minutes ago    384MB
spring-chatting-server_chatting-server        latest                8cac42df32bd   23 hours ago     428MB
spring-chatting-server_customer-server        latest                7b343af93f1d   23 hours ago     427MB
spring-chatting-server_user-server            latest                9a3dd080cd2a   23 hours ago     428MB
spring-chatting-server_configuration-server   latest                7f374649bcab   25 hours ago     385MB
spring-chatting-server_discovery-server       latest                64d1cd1ae116   25 hours ago     378MB
rabbitmq                                      3-management-alpine   9e800e42f2cf   3 days ago       173MB
rabbitmq                                      3.11-management       87d31604e8cf   5 weeks ago      275MB
bitnami/zookeeper                             latest                e0d1f331bd2f   5 weeks ago      510MB
bitnami/kafka                                 latest                95454f5a72bb   5 weeks ago      551MB
spring-chatting-server_auth-server-1          latest                9ae802c165e7   5 weeks ago      406MB
docker-elk_setup                              latest                b965ae481dd2   6 weeks ago      1.29GB
spring-chatting-server_nginx                  latest                04fdc32fb130   7 weeks ago      142MB
provectuslabs/kafka-ui                        latest                03f25408f3cd   7 weeks ago      291MB
spring-chatting-server_front-server           latest                82b27f5e1d9d   7 weeks ago      722MB
cassandra                                     latest                7a7e2ce2d4da   2 months ago     356MB
spring-chatting-server_chatting-server-2      latest                de958703bd82   2 months ago     722MB
spring-chatting-server_chatting-server-1      latest                de958703bd82   2 months ago     722MB
postgres                                      12-alpine             0c2beca0db95   2 months ago     237MB
debezium/connect                              1.9                   dce6337d791b   4 months ago     902MB
docker-elk_elasticsearch                      latest                32355aa9b9dd   4 months ago     1.29GB
docker-elk_kibana                             latest                1215bb66460c   4 months ago     711MB
docker-elk_logstash                           latest                ca511a01b22c   5 months ago     740MB
confluentinc/cp-kafka                         7.2.1                 d893473a6510   9 months ago     782MB
confluentinc/cp-zookeeper                     7.2.1                 3f28db6a433d   9 months ago     782MB
obsidiandynamics/kafdrop                      latest                968db96ba800   12 months ago    619MB
confluentinc/cp-kafka                         5.3.0                 fa255d39a2d6   18 months ago    589MB
zookeeper                                     3.4.9                 3b83d9104a4c   6 years ago      129MB

gyuminhwangbo@Gyuminui-MacBookPro spring-chatting-server % docker images -f "dangling=true" -q
cdb270f9d9be

Remove dangling images

docker rmi $(docker images -f "dangling=true" -q)

K8S master node's api server `:6443` isn't opened

Description

Because of low memory resources, kubernetes automatically swapping off api server and other essential components 😂😂

scheduler, controller-manager, api-server is constantly turned off!!

  • Swapped out scheduler and controller-manager
ubuntu@ip-172-31-4-229:~$ kubectl get componentstatuses
Warning: v1 ComponentStatus is deprecated in v1.19+
NAME                 STATUS      MESSAGE                                                                                        ERROR
scheduler            Unhealthy   Get "https://127.0.0.1:10259/healthz": dial tcp 127.0.0.1:10259: connect: connection refused   
controller-manager   Unhealthy   Get "https://127.0.0.1:10257/healthz": dial tcp 127.0.0.1:10257: connect: connection refused   
etcd-0               Healthy     ok
  • Swapped out API server
[root@ip-172-31-2-180 ec2-user]# kubectl get csr
The connection to the server 172.31.2.180:6443 was refused - did you specify the right host or port?

Solved

We now know that this swapped out process is lack of our server's memory resources. But this things should be remained in memory.~~

  • So first we login with root user, turn off swapoff option in Linux.~~

Swap is space used to swap memory pages to disk when memory is low. Since it is not recommended to use swap on servers running Kubernetes clusters, this command disables swap. The -a option means to disable all currently active swap space.

  • And tracing kuberctl version for constant observing.
sudo -i
swapoff -a
exit
strace -eopenat kubectl version

Its Not beacuse of lack of CPU & MEM, rather AWS EC2 linux problems

Since I use OS Amazon Linux 2023 AMI and load kubernetes from Kubernetes package repositories, it keeps failed. But after using OS Amazon Linux 2 AMI (HVM) - Kernel 5.10, SSD Volume Type, loading kubernetes from Google-hosted package repository, it works successfully and stable!

[Kafka] injecting broker number

Description

In KafkaTopicConfig.java, we generate multiple topics and that topics are connected to 3 brokers. This cannot be performed if your kafka infra has only one broker!

When we manually set every number of connected brokers for each topics(3 to 1), we realize that it is time consuming process.

So we move this setting in kafka.broker.number application.yaml and inject its number. And connecting its number to createOrModifyTopics methods.

Now we can easily change the number of connected brokers.

  • KafkaTopicConfig.java
@Configuration
@ConditionalOnProperty(value = "kafka.enabled", matchIfMissing = true)
public class KafkaTopicConfig {
    @Autowired
    private KafkaAdmin kafkaAdmin;

    @Value("${kafka.broker.number}")
    private int brokerNum;

    /**
     * Create topics with the number of partition and replica.
     */
    @PostConstruct
    public void init() {
        kafkaAdmin.createOrModifyTopics(generateTopic(KafkaTopic.userReq,Integer.parseInt(KafkaTopicPartition.userReq),brokerNum));
        kafkaAdmin.createOrModifyTopics(generateTopic(KafkaTopic.userRes,Integer.parseInt(KafkaTopicPartition.userRes),brokerNum));
        kafkaAdmin.createOrModifyTopics(generateTopic(KafkaTopic.userChatRollback,Integer.parseInt(KafkaTopicPartition.userChatRollback),brokerNum));
        kafkaAdmin.createOrModifyTopics(generateTopic(KafkaTopic.userCustomerRollback,Integer.parseInt(KafkaTopicPartition.userCustomerRollback),brokerNum));
    }

    /**
     * Create topic with the number of partition and replica.
     * The number of partition should be greater than or equal to the number of consumer in a group.
     * `sizeOf.Partition >= sizeOf.Consumer
     * If the number of partition is 2, then 2 consumer can be matched with 2 leader-partition
     *
     * @param topicName
     * @param partitionNum
     * @param brokerNum
     * @return NewTopic
     */
    private NewTopic generateTopic(String topicName,int partitionNum, int brokerNum) {
        return TopicBuilder.name(topicName)
                .partitions(partitionNum) // the number of partitions
                .replicas(brokerNum) // the number of replica sync brokers
                .build(); // topics will have 2 leader-partition, 4 follow-partition
    }

}

Upgrading EC2 instance t2.micro to m2.medium and K8S configuring

Description

Since our server remain resource is very low as I mention before in #113 , we have to upgrade or scaling our server. With AWS, its very easy to upgrading instance(not scaling horizontally, but vertically).

So I change EC2 instance type t2.micro to t2.medium which has 2 vCPU and 4GB MEM. And I keep looking at cpu usage and mem usage after running server. It works! but some point there is concerns😂

It could be fine but at the same time, it could be very temporary. What if our service needs to be scaled up? What if we need to scaling only for some service?

Considering this topics, we choose to move forward with K8S!

We now plan to combine multiple EC2 instance with Kubernetes cluster. Using AWS-EKS cluster would be easier than configuring myself. But implementing all by myself can gives me more experience in technical aspects.

This will be processed after stabilizing servers

No `spring-chatting-server_` prefix is setted

Description

When running current docker compose build, we can build images as below. Here we can see that no spring-chatting-server_ prefix is setted!

api-server                                    latest                48282233a336   20 seconds ago   384MB
customer-server                               latest                d823a09e3a32   24 seconds ago   427MB
chatting-server                               latest                b0098ed98fcf   31 seconds ago   431MB
user-server                                   latest                57c0356c9156   38 seconds ago   439MB
discovery-server                              latest                701e82141984   43 seconds ago   378MB
configuration-server                          latest                08317650c3c6   48 seconds ago   385MB

I forgot how the image step works.

When we compose build with below code, the build image name will be configuration-server and tag will be latest 👍

  configuration-server:
    container_name: configuration-server
    build:
      context: ./config-service
      dockerfile: Dockerfile
    image: configuration-server:latest
    expose:
      - 8888
    ports:
      - "8888:8888"
    environment:
      SPRING_RABBITMQ_HOST: rabbitmq
      spring_cloud_config_server_git_uri: https://github.com/ghkdqhrbals/spring-chatting-server
    depends_on:
      - rabbitmq
    restart: always

That means we have to add spring-chatting-server_ prefix here! Because since we running this build in git actions, environment also has multiple docker files. So we need to distinguish images that is ours or git actions'.

Rather just set prefix as spring-chatting-server_, main-service_ will be more appropriate!

Connector configuration is invalid and contains the following 1 error(s) Error while validating connector config: Connection to localhost:5434 refused

When i sent configurations to my source connector, I got error bellow.

{
    "error_code": 400,
    "message": "Connector configuration is invalid and contains the following 1 error(s):\nError while validating connector config: Connection to localhost:5434 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"
}

Here's my request in json

POST http://localhost:8083/connectors
{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "localhost",
        "database.port": "5434",
        "database.user": "postgres",
        "database.password": "password",
        "database.dbname" : "chat",
        "database.server.name": "dbserver1"
    }
}   

How to manage memory leak issues in Sinks Flux

Description

Since we using Reactive Stream, we should explicitly set and manage termination conditions. This is very considerable, time-consuming tasks. If we miss some terminations of stream, memory leakage will be occured. I will explain the current process of using reactive stream.

we use Sinks.Many HashMap to communicate with other threads.

  • sinkMap saved in Methods Area which is not the target of GC.
  • The objects in sinkMap are saved in Heap Area which is the target of GC.
  1. We set sinkMap as ConcurrentHashMap for thread safe.
@Configuration
@Slf4j
public class AsyncConfig  {
    public static ConcurrentHashMap<String, Sinks.Many<Object>> sinkMap = new ConcurrentHashMap<>();
    ...
}
  1. We add Sinks.many().multicast().onBackpressureBuffer() in sinkMap when the server get request from the client.
@PostMapping(value = "/user", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<?> addUser(@RequestBody RequestUser req) throws InterruptedException {
        ...
        AsyncConfig.sinkMap.put(req.getUserId(), Sinks.many().multicast().onBackpressureBuffer());
        ...
}
  1. We take this backpressureBuffer out in other threads. Using .tryEmitNext(), other threads can send values to Reactive Stream.
    // kafka listener thread
    private void checkCurrentStatus(UserTransactions userTransactions,String userId) {
        if (!userTransactions.getChatStatus().equals(UserResponseStatus.USER_APPEND.name())
                && !userTransactions.getCustomerStatus().equals(UserResponseStatus.USER_APPEND.name())
                && !userTransactions.getUserStatus().equals(UserStatus.USER_INSERT.name())
                && !userTransactions.getUserStatus().equals(UserStatus.USER_DELETE.name())){
            AsyncConfig.sinkMap.get(userId).tryEmitNext(userTransactions);
            AsyncConfig.sinkMap.get(userId).tryEmitComplete();
            AsyncConfig.sinkMap.remove(userId);
        }
        else{
            AsyncConfig.sinkMap.get(userId).tryEmitNext(userTransactions);
        }
    }
  1. After all threads finish to send values, .tryEmitComplete or . tryEmitError() is called.
@KafkaListener(topics = KafkaTopic.userRes, containerFactory = "userKafkaListenerContainerFactory", concurrency = KafkaTopicPartition.userRes)
    public void listenUser(UserResponseEvent req) {
        userService.updateStatus(req).exceptionally(e->{
            log.info("There is no event transaction");
            AsyncConfig.sinkMap.get(req.getUserId()).tryEmitError(e); // 
            AsyncConfig.sinkMap.remove(req.getUserId()); // removing object in hashMap
            return null;
        });
    }
  1. And the Reactive Stream that is created by Sinks.many() will be deconstructed.

  2. Now the resource of Reactive Stream is returned, and remove the object located in sinkMap.

Since reactive stream don't have automated termination, we need to keep our eyes on track every process.

But Flux has automated termination like .take(), .timeout(Duration.ofSeconds(1))

[ERROR] The given id must not be null!

Solved

Since I set chatting entity with generated ID, chatting.getId() will be null.
I call getId function right before JPA goes through, and of course, chatting has null ID field.

Optional<Chatting> findChatting = chatRepository.findById(chatting.getId());

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.