Code Monkey home page Code Monkey logo

valkka-multiprocess's People

Contributors

elsampsa avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

valkka-multiprocess's Issues

Synchronizing multiple RTSP camera streams

Hi.

First of all, thank you so much for this kind of great tool for real-time data streaming in python.

I will apologize and reopen the issue in another repo if it will be revealed that it's not related to valkka-multiprocess.

I tried valkka-core with 10 cameras with hardware accelerated setup and that was more than what I need. FPS is >= 20.

Now, I got stuck synchronization. I need to collect every coming frame from all cameras and save into Redis array in the following format:

{
  "cam1": "path/to/cam1/frame1.jpg",
  "cam2": "path/to/cam1/frame1.jpg",
  ...
  ...
  "cam10": "path/to/cam10/frame1.jpg"
}

Here is my code:

Server

class LiveStream:
	def __init__(self, shmem_buffers, shmem_name, address, slot, width, height):
		self.shmem_buffers = shmem_buffers
		self.shmem_name = shmem_name
		self.address = f"rtsp://{USER}:{PASSWORD}@{address}:554"
		self.slot = slot

		# RBGShmem Filter
		self.shmem_filter = RGBShmemFrameFilter(self.shmem_name, shmem_buffers, width, height)
		# self.shmem_filter = BriefInfoFrameFilter(self.shmem_name)  # For Debugging

		# SWS Filter
		self.sws_filter = SwScaleFrameFilter(f"sws_{self.shmem_name}", width, height, self.shmem_filter)
		# self.interval_filter = TimeIntervalFrameFilter("interval_filter", 0, self.sws_filter)

		# decoding part
		self.avthread = AVThread("avthread", self.sws_filter)
		self.av_in_filter = self.avthread.getFrameFilter()

		# define connection to camera
		self.ctx = LiveConnectionContext(LiveConnectionType_rtsp, self.address, self.slot, self.av_in_filter)

		self.avthread.startCall()
		self.avthread.decodingOnCall()

	def close(self):
		self.avthread.decodingOffCall()
		self.avthread.stopCall()


def start_valkka(camera_indexes, shmem_buffers, width, height, shmem_names=[], prod=False):
	livestreams = {}

	# Defining livethread
	livethread = LiveThread("livethread")

	# Defining filter
	shmem_filters = {}
	sws_filters = {}
	for i, cam in enumerate(camera_indexes, start=1):
		livestreams[i] = LiveStream(
			shmem_buffers=shmem_buffers,
			shmem_name=f"{cam}",
			address=cam,
			slot=i,
			width=width,
			height=height
		)

	# Start livethread
	livethread.startCall()

	# Register context to livethread
	for livestream in livestreams.values():
		livethread.registerStreamCall(livestream.ctx)
		livethread.playStreamCall(livestream.ctx)
		shmem_names.append(livestream.shmem_name)

	if prod:
		while True:
			continue
	else:
		time.sleep(30)

	for livestream in livestreams.values():
		livestream.close()
	livethread.stopCall()
	print("Bye")
	return True

Client

import threading
import json
import cv2
import redis
from termcolor import colored
from valkka.api2 import ShmemRGBClient

from config import get_configs

conf = get_configs()
FRAMES_SAVE_TO = conf.get("frames_path")
REDIS_HOST = conf.get("redis")["host"]
REDIS_PORT = conf.get("redis")["port"]
REDIS_DB = conf.get("redis")["db"]


def save_frame(shmem_name, shmem_buffers, width, height, mstimeout):
	client = ShmemRGBClient(
		name=shmem_name,
		n_ringbuffer=shmem_buffers,
		width=width,
		height=height,
		mstimeout=mstimeout,
		verbose=False
	)
	lock = threading.Lock()
	while True:
		index, meta = client.pullFrame()
		if index is None:
			print(colored("No frame...", "red"))
			continue
		data = client.shmem_list[index][0:meta.size]
		img = data.reshape((meta.height, meta.width, 3))
		img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
		image = FRAMES_SAVE_TO + f"{shmem_name}/{meta.mstimestamp}.jpg"
		# lock.acquire()
		# push2redis(shmem_name, image)
		# lock.release()
		cv2.imwrite(image, img)

		# img = imutils.resize(img, 640, 360)
		# cv2.imshow(shmem_name, img)
		# cv2.waitKey(1)

Main

import threading
import time
from config import get_configs
from client import save_frame

from server import start_valkka

# Load config
conf = get_configs()
camera_indexes = conf.get("cameras", [0])
width = conf.get("image_width", 1920)
height = conf.get("image_height", 1080)
shmem_buffers = conf.get("shared_memory_buffers", 10)
mstimeout = conf.get("mstimeout", 10)

shmem_names = []
client_threads = []

if __name__ == '__main__':
	# Start server thread
	print("Starting Camera...")
	server_thread = threading.Thread(target=start_valkka, args=(camera_indexes, shmem_buffers, width, height, shmem_names, False))
	server_thread.start()
	time.sleep(3)
	print(f"Total shmem: {len(shmem_names)}")

	# Start client thread
	for shmem_name in shmem_names:
		thread = threading.Thread(target=save_frame, args=(shmem_name, shmem_buffers, width, height, mstimeout))
		client_threads.append(thread)
		thread.start()

	time.sleep(60)

	server_thread.join()
	for thread in client_threads:
		thread.join()

Actually, threading.Lock() is working, but performance is not what we need: almost 15fps. At least 20FPS needed for image analysis model.

What I tried:

  • integrating existing code to valkka-multiprocess by the examples given in the tutorials.

And I got what is "the other side" approach (as you described it with Stranger Things😉). However, I cannot find out how to setup multiple cameras streaming and synchronizing them.

Specifically, self.eg = EventGroup(10) this line in the 2nd tutorial.

I want to realize:

  • MyProcess can/should be started multiple times as I did in the existing code or somehow multiple threads can/should be handled inside this process.
  • how EventGroup related to multiple cameras(if it really does)

Thanks for any kind of answers/suggestions beforehand.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.