다운로드 : http://mirror.navercorp.com/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz
zookeeper 실행 : bin\windows\zookeeper-server-start.bat config\zookeeper.properties
zookeeper.bat 파일 생성하고 아래와 같이 작성하면 실행됨
C:\Users\Junny\Downloads\kafka_2.11-1.1.0\kafka_2.11-1.1.0\bin\windows\zookeeper-server-start.bat C:\Users\Junny\Downloads\kafka_2.11-1.1.0\kafka_2.11-1.1.0\config\zookeeper.properties
kafka 실행 : > bin\windows\kafka-server-start.bat config\server.properties
kafka.bat 파일 생성하고 아래와 같이 작성하면 실행됨
C:\Users\Junny\Downloads\kafka_2.11-1.1.0\kafka_2.11-1.1.0\bin\windows\kafka-server-start.bat C:\Users\Junny\Downloads\kafka_2.11-1.1.0\kafka_2.11-1.1.0\config\server.properties
python에서 kafka-python 설치
> pip install kafka-python
속도는 confluent-kafka-python 빠르다고 한다.
window에서 돌리는 방법은 아직 찾지 못했음.
링크 : https://github.com/confluentinc/confluent-kafka-python
#!/usr/bin/python import asyncioimport loggingfrom kafka import KafkaProducer log = logging.getLogger(__name__) clients = {} # task -> (reader, writer) class Server: def __init__ (self , loop): self .producer = KafkaProducer(bootstrap_servers ='localhost:9092' ) self .loop = loop f = asyncio.start_server(self .accept_client, host =None, port =9999 ) self .loop.run_until_complete(f) self .loop.run_forever() def accept_client (self , client_reader, client_writer): task = asyncio.Task(self .handle_client(client_reader, client_writer)) clients[task] = (client_reader, client_writer) def client_done (task): del clients[task] client_writer.close() # log.info("End Connection") # log.info("New Connection") task.add_done_callback(client_done) @asyncio.coroutine def handle_client (self , client_reader, client_writer ): # give client a chance to respond, timeout after 10 seconds data = yield from asyncio.wait_for(client_reader.read(4 ), timeout =10.0 ) img_size = int .from_bytes(data, byteorder ='big' , signed =True ) data = yield from asyncio.wait_for(client_reader.read(img_size), timeout =10.0 ) if len (data): self .producer.send('my-topic' , data, key ="12343" .encode()) if data is None : log.warning("Expected WORLD, received None" ) return def main (): loop = asyncio.get_event_loop() server = Server(loop)if __name__ == '__main__' : log = logging.getLogger("" ) formatter = logging.Formatter("%(asctime)s %(levelname)s " + "[%(module)s:%(lineno)d] %(message)s" ) # setup console logging log.setLevel(logging.INFO) ch = logging.StreamHandler() ch.setLevel(logging.INFO) ch.setFormatter(formatter) log.addHandler(ch) main()
Consumer.py
#!/usr/bin/env python import loggingimport cv2import numpy as npfrom kafka import KafkaConsumerif __name__ == "__main__" : logging.basicConfig( format ='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s' , level =logging.INFO ) consumer = KafkaConsumer('my-topic' , bootstrap_servers ='localhost:9092' , auto_offset_reset ='earliest' , group_id ="sijoo82" , consumer_timeout_ms =1000 ) while True : for message in consumer: data = np.fromstring(message.value, dtype ='uint8' ) print ("%s:%d:%d: key=%s" % (message.topic, message.partition, message.offset, message.key)) # consumer.commit() decimg = cv2.imdecode(data, 0 ) cv2.imshow("recv" , decimg) cv2.waitKey(1 ) consumer.close()
image send
#!/usr/bin/python import socketimport cv2import numpy as npimport logging encode_param= [int (cv2.IMWRITE_JPEG_QUALITY), 90 ]if __name__ == "__main__" : cam = cv2.VideoCapture(0 ) cnt = 0 while True : ret_val, img = cam.read() img = cv2.flip(img, -1 ) img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) img = cv2.transpose(img) img = cv2.resize(img, (240 , 320 ), interpolation =cv2.INTER_LINEAR) img = cv2.copyMakeBorder(img, 0 , 0 , 40 , 40 , cv2.BORDER_CONSTANT, value =0 ) result, encode_image = cv2.imencode('.jpg' , img[:, :], encode_param) data = np.array(encode_image) string_image = data.tostring() sock = socket.socket() sock.connect(("127.0.0.1" , 9999 )) sock.send((len (string_image)).to_bytes(4 , byteorder ='big' )) print ("Send Image!!! {} size: {}" .format(cnt, len (string_image))) cnt += 1 sock.send(string_image) sock.close()
공유하기
URL 복사 카카오톡 공유 페이스북 공유 엑스 공유