다운로드 : http://mirror.navercorp.com/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz
zookeeper 실행 : bin\windows\zookeeper-server-start.bat config\zookeeper.properties
zookeeper.bat 파일 생성하고 아래와 같이 작성하면 실행됨
C:\Users\Junny\Downloads\kafka_2.11-1.1.0\kafka_2.11-1.1.0\bin\windows\zookeeper-server-start.bat C:\Users\Junny\Downloads\kafka_2.11-1.1.0\kafka_2.11-1.1.0\config\zookeeper.properties
|
kafka 실행 : > bin\windows\kafka-server-start.bat config\server.properties
kafka.bat 파일 생성하고 아래와 같이 작성하면 실행됨
C:\Users\Junny\Downloads\kafka_2.11-1.1.0\kafka_2.11-1.1.0\bin\windows\kafka-server-start.bat C:\Users\Junny\Downloads\kafka_2.11-1.1.0\kafka_2.11-1.1.0\config\server.properties |
python에서 kafka-python 설치
> pip install kafka-python
속도는 confluent-kafka-python 빠르다고 한다.
window에서 돌리는 방법은 아직 찾지 못했음.
링크 : https://github.com/confluentinc/confluent-kafka-python
#!/usr/bin/python import asyncio import logging from kafka import KafkaProducer
log = logging.getLogger(__name__) clients = {} # task -> (reader, writer)
class Server: def __init__(self, loop): self.producer = KafkaProducer(bootstrap_servers='localhost:9092') self.loop = loop f = asyncio.start_server(self.accept_client, host=None, port=9999) self.loop.run_until_complete(f) self.loop.run_forever()
def accept_client(self, client_reader, client_writer): task = asyncio.Task(self.handle_client(client_reader, client_writer)) clients[task] = (client_reader, client_writer) def client_done(task): del clients[task] client_writer.close() # log.info("End Connection")
# log.info("New Connection") task.add_done_callback(client_done)
@asyncio.coroutine def handle_client(self, client_reader, client_writer): # give client a chance to respond, timeout after 10 seconds data = yield from asyncio.wait_for(client_reader.read(4), timeout=10.0)
img_size = int.from_bytes(data, byteorder='big', signed=True)
data = yield from asyncio.wait_for(client_reader.read(img_size), timeout=10.0) if len(data): self.producer.send('my-topic', data, key="12343".encode()) if data is None: log.warning("Expected WORLD, received None") return
def main(): loop = asyncio.get_event_loop() server = Server(loop)
if __name__ == '__main__': log = logging.getLogger("") formatter = logging.Formatter("%(asctime)s %(levelname)s " + "[%(module)s:%(lineno)d] %(message)s") # setup console logging log.setLevel(logging.INFO) ch = logging.StreamHandler() ch.setLevel(logging.INFO) ch.setFormatter(formatter) log.addHandler(ch) main()
|
Consumer.py
#!/usr/bin/env python import logging import cv2 import numpy as np from kafka import KafkaConsumer
if __name__ == "__main__": logging.basicConfig( format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', level=logging.INFO ) consumer = KafkaConsumer('my-topic', bootstrap_servers='localhost:9092', auto_offset_reset='earliest', group_id="sijoo82", consumer_timeout_ms=1000) while True: for message in consumer: data = np.fromstring(message.value, dtype='uint8') print("%s:%d:%d: key=%s" % (message.topic, message.partition, message.offset, message.key)) # consumer.commit() decimg = cv2.imdecode(data, 0) cv2.imshow("recv", decimg) cv2.waitKey(1)
consumer.close()
|
image send
#!/usr/bin/python import socket import cv2 import numpy as np import logging
encode_param= [int(cv2.IMWRITE_JPEG_QUALITY), 90]
if __name__ == "__main__": cam = cv2.VideoCapture(0) cnt = 0 while True: ret_val, img = cam.read() img = cv2.flip(img, -1) img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) img = cv2.transpose(img) img = cv2.resize(img, (240, 320), interpolation=cv2.INTER_LINEAR) img = cv2.copyMakeBorder(img, 0, 0, 40, 40, cv2.BORDER_CONSTANT, value=0) result, encode_image = cv2.imencode('.jpg', img[:, :], encode_param) data = np.array(encode_image) string_image = data.tostring()
sock = socket.socket() sock.connect(("127.0.0.1", 9999)) sock.send((len(string_image)).to_bytes(4, byteorder='big')) print("Send Image!!! {} size: {}".format(cnt, len(string_image))) cnt += 1 sock.send(string_image) sock.close()
|