-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprotocol.py
More file actions
134 lines (119 loc) · 5.31 KB
/
protocol.py
File metadata and controls
134 lines (119 loc) · 5.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"""
This module provides a Twisted Service that handles the output of
a client(logforwarder) that streams log records in plain text over TCP.
logforwarder is the source, this is the sink.
"""
import os
import sys
import json
import twisted
from os import path
import logging.config
import logging.handlers
from struct import unpack
from model import LOGS_DIR, SERVER_MODEL
from twisted.python import log
from twisted.internet.protocol import Protocol, Factory
log.startLogging(sys.stdout)
LONG_INT_LEN = 4
class LoggingProtocol(Protocol):
"""Encapsulates the actual handling of the data received by the
protocol. It collects all incoming data, building and forwarding complete
log records as they arrive.
"""
def __init__(self):
"""Get a "loggingserver" logger, and set up our buffer and record
instance variables.
"""
log.msg("Creating new LoggingProtocol object")
self.logger = None
self.buffer = ""
self.buffer_len = self.full_buffer_len = 0
self.rec_len = None
def connectionMade(self):
self.factory.num_protocols += 1
log.msg('there are currently {} connections'.format(
self.factory.num_protocols))
def dataReceived(self, data):
"""Called whenever there's data available in the socket.
"""
# First, paste the recieved data onto what we have and compute the
# buffer's length only once rather than every time we need it.
self.buffer += data
self.buffer_len = len(self.buffer)
# Keep processing the buffer, peeling off logging records, till we
# no longer have a complete record, then exit. We'll get called again
# as soon as there's more data available.
while True:
# If we've not yet gotten the record length for the next record,
# and we have enough data to get it, do so.
if not self.rec_len and self.buffer_len >= LONG_INT_LEN:
self.rec_len = unpack(">L", self.buffer[:LONG_INT_LEN])[0]
self.full_buffer_len = LONG_INT_LEN + self.rec_len
# If we've gotten the length, and there's enough data in the
# buffer to build our record, do so.
#
# Otherwise, we're done (for now).
if (self.rec_len and self.buffer_len >= self.full_buffer_len):
# get the plain log message, from the end of the length bytes
# to the end of full_buffer_len i.e. just the data
pure_data = self.buffer[LONG_INT_LEN:self.full_buffer_len]
# extract python dictionary from the json string
obj = json.loads(pure_data)
# use the factory helper to get a logger instance
# insert the log entry using the same instance
log.msg("LoggingProtocol: logging new record")
logger = self.factory.get_logger(obj.get('token'),
obj.get('type'),
obj.get('name'))
# strip newline character while logging
logger.debug(obj.get('data').strip())
# track the object in our models
SERVER_MODEL.logRecordHandler(obj)
# Adjust our buffer to point past the end of what we just
# processed and recompute the length
self.buffer = self.buffer[self.full_buffer_len:]
self.buffer_len = len(self.buffer)
# Unset self.rec_len and self.full_buffer_len since we don't
# yet know the length of the next one. When we loop around,
# we'll take care of that if we've got enough data to work on.
self.rec_len = self.full_buffer_len = None
else:
# otherwise, we either don't know the length,
# or don't have a complete record, done for now
break
def connectionLost(self, reason):
log.msg("connectionLost called")
self.factory.num_protocols -= 1
self.buffer = ""
def handle_quit(self):
log.msg("handle_quit called")
self.transport.loseConnection()
class LoggingFactory(Factory):
"""Factory that creates the LoggingProtocol object"""
protocol = LoggingProtocol
def __init__(self):
self.num_protocols = 0
self.logger_cache = {}
def get_logger(self, token, _type, name):
logger_name = '.'.join([token, _type, name])
if logger_name in self.logger_cache:
return self.logger_cache.get(logger_name)
else:
logger = self.instantiate_logger(token, _type, name)
self.logger_cache[logger_name] = logger
return logger
@staticmethod
def instantiate_logger(token, _type, name):
client_log_dir = path.join(LOGS_DIR, token, _type)
if not path.exists(client_log_dir):
os.makedirs(client_log_dir)
logger_name = '.'.join([token, _type, name])
logger_file = path.join(client_log_dir, name)
logger = logging.getLogger(logger_name)
rfg = logging.handlers.TimedRotatingFileHandler(
logger_file, when='D', backupCount=6)
logger.setLevel(logging.DEBUG)
rfg.setLevel(logging.DEBUG)
logger.addHandler(rfg)
return logger