Programming/Python

카프카(KAFKA) 윈도우(window) 설치 및 테스트

빠릿베짱이 2018. 9. 13. 18:10
반응형

 Kafka windows 설치

 

다운로드 : http://mirror.navercorp.com/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz


zookeeper 실행 : bin\windows\zookeeper-server-start.bat config\zookeeper.properties

zookeeper.bat 파일 생성하고 아래와 같이 작성하면 실행됨

C:\Users\Junny\Downloads\kafka_2.11-1.1.0\kafka_2.11-1.1.0\bin\windows\zookeeper-server-start.bat C:\Users\Junny\Downloads\kafka_2.11-1.1.0\kafka_2.11-1.1.0\config\zookeeper.properties


kafka 실행 : > bin\windows\kafka-server-start.bat config\server.properties

kafka.bat 파일 생성하고 아래와 같이 작성하면 실행됨

C:\Users\Junny\Downloads\kafka_2.11-1.1.0\kafka_2.11-1.1.0\bin\windows\kafka-server-start.bat C:\Users\Junny\Downloads\kafka_2.11-1.1.0\kafka_2.11-1.1.0\config\server.properties 



python에서 kafka-python 설치

> pip install kafka-python


속도는 confluent-kafka-python 빠르다고 한다.

window에서 돌리는 방법은 아직 찾지 못했음.

링크 : https://github.com/confluentinc/confluent-kafka-python



 Kafka GUI tool

 링크 : http://www.kafkatool.com/features.html


토픽 생성 가능

파티션 갯수 가능

복사본의 경우 로컬에서 여러개 하니 오류 발생

C:\tmp\kafka-logs 경로에 로그 파일이 생김



Image stream processing in kafka

Server.py --> Producer

#!/usr/bin/python
import asyncio
import logging
from kafka import KafkaProducer


log = logging.getLogger(__name__)
clients = {} # task -> (reader, writer)

class Server:
def __init__(self, loop):
self.producer = KafkaProducer(bootstrap_servers='localhost:9092')
self.loop = loop
f = asyncio.start_server(self.accept_client, host=None, port=9999)
self.loop.run_until_complete(f)
self.loop.run_forever()

def accept_client(self, client_reader, client_writer):
task = asyncio.Task(self.handle_client(client_reader, client_writer))
clients[task] = (client_reader, client_writer)
def client_done(task):
del clients[task]
client_writer.close()
# log.info("End Connection")

# log.info("New Connection")
task.add_done_callback(client_done)

@asyncio.coroutine
def handle_client(self, client_reader, client_writer):
# give client a chance to respond, timeout after 10 seconds
data = yield from asyncio.wait_for(client_reader.read(4), timeout=10.0)

img_size = int.from_bytes(data, byteorder='big', signed=True)

data = yield from asyncio.wait_for(client_reader.read(img_size), timeout=10.0)
if len(data):
self.producer.send('my-topic', data, key="12343".encode())
if data is None:
log.warning("Expected WORLD, received None")
return

def main():
loop = asyncio.get_event_loop()
server = Server(loop)

if __name__ == '__main__':
log = logging.getLogger("")
formatter = logging.Formatter("%(asctime)s %(levelname)s " + "[%(module)s:%(lineno)d] %(message)s")
# setup console logging
log.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
log.addHandler(ch)
main()


Consumer.py

#!/usr/bin/env python
import logging
import cv2
import numpy as np
from kafka import KafkaConsumer


if __name__ == "__main__":
logging.basicConfig(
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
level=logging.INFO
)
consumer = KafkaConsumer('my-topic',
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
group_id="sijoo82",
consumer_timeout_ms=1000)
while True:
for message in consumer:
data = np.fromstring(message.value, dtype='uint8')
print("%s:%d:%d: key=%s" % (message.topic, message.partition, message.offset, message.key))
# consumer.commit()
decimg = cv2.imdecode(data, 0)
cv2.imshow("recv", decimg)
cv2.waitKey(1)

consumer.close()



image send


#!/usr/bin/python
import socket
import cv2
import numpy as np
import logging


encode_param= [int(cv2.IMWRITE_JPEG_QUALITY), 90]

if __name__ == "__main__":
cam = cv2.VideoCapture(0)
cnt = 0
while True:
ret_val, img = cam.read()
img = cv2.flip(img, -1)
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
img = cv2.transpose(img)
img = cv2.resize(img, (240, 320), interpolation=cv2.INTER_LINEAR)
img = cv2.copyMakeBorder(img, 0, 0, 40, 40, cv2.BORDER_CONSTANT, value=0)
result, encode_image = cv2.imencode('.jpg', img[:, :], encode_param)
data = np.array(encode_image)
string_image = data.tostring()

sock = socket.socket()
sock.connect(("127.0.0.1", 9999))
sock.send((len(string_image)).to_bytes(4, byteorder='big'))
print("Send Image!!! {} size: {}".format(cnt, len(string_image)))
cnt += 1
sock.send(string_image)
sock.close()





반응형