Contact
CoCalc Logo Icon
StoreFeaturesDocsShareSupport News AboutSign UpSign In
| Download
Views: 39538
1
###############################################################################
2
#
3
# CoCalc: Collaborative Calculation in the Cloud
4
#
5
# Copyright (C) 2016 -- 2017, SageMath, Inc.
6
#
7
# This program is free software: you can redistribute it and/or modify
8
# it under the terms of the GNU General Public License as published by
9
# the Free Software Foundation, either version 3 of the License, or
10
# (at your option) any later version.
11
#
12
# This program is distributed in the hope that it will be useful,
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
# GNU General Public License for more details.
16
#
17
# You should have received a copy of the GNU General Public License
18
# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
#
20
###############################################################################
21
22
# This is a small helper class to record real-time metrics about the hub.
23
# It is designed for the hub, such that a local process can easily check its health.
24
# After an initial version, this has been repurposed to use prometheus.
25
# It wraps its client elements and adds some instrumentation to some hub components.
26
27
fs = require('fs')
28
path = require('path')
29
underscore = require('underscore')
30
{execSync} = require('child_process')
31
{defaults} = misc = require('smc-util/misc')
32
33
# Prometheus client setup -- https://github.com/siimon/prom-client
34
prom_client = require('prom-client')
35
prom_default_metrics = prom_client.defaultMetrics
36
# additionally, record GC statistics
37
# https://www.npmjs.com/package/prometheus-gc-stats
38
require('prometheus-gc-stats')()()
39
40
# some constants
41
FREQ_s = 10 # update stats every FREQ seconds
42
DELAY_s = 5 # with an initial delay of DELAY seconds
43
#DISC_LEN = 10 # length of queue for recording discrete values
44
#MAX_BUFFER = 1000 # max. size of buffered values, which are cleared in the @_update step
45
46
# CLK_TCK (usually 100, but maybe not ...)
47
try
48
CLK_TCK = parseInt(execSync('getconf CLK_TCK', {encoding: 'utf8'}))
49
catch err
50
CLK_TCK = null
51
52
###
53
# exponential smoothing, based on linux's load 1-exp(-1) smoothing
54
# with compensation for sampling time FREQ_s
55
d = 1 - Math.pow(Math.exp(-1), FREQ_s / 60)
56
DECAY = [d, Math.pow(d, 5), Math.pow(d, 15)]
57
###
58
59
###
60
# there is more than just continuous values
61
# cont: continuous (like number of changefeeds), will be smoothed
62
# disc: discrete, like blocked, will be recorded with timestamp
63
# in a queue of length DISC_LEN
64
exports.TYPE = TYPE =
65
COUNT: 'counter' # strictly non-decrasing integer
66
GAUGE: 'gauge' # only the most recent value is recorded
67
LAST : 'latest' # only the most recent value is recorded
68
DISC : 'discrete' # timeseries of length DISC_LEN
69
CONT : 'continuous' # continuous with exponential decay
70
MAX : 'contmax' # like CONT, reduces buffer to max value
71
SUM : 'contsum' # like CONT, reduces buffer to sum of values divided by FREQ_s
72
###
73
74
exports.new_counter = new_counter = (name, help, labels) ->
75
# a prometheus counter -- https://github.com/siimon/prom-client#counter
76
# use it like counter.labels(labelA, labelB).inc([positive number or default is 1])
77
if not name.endsWith('_total')
78
throw "Counter metric names have to end in [_unit]_total but I got '#{name}' -- https://prometheus.io/docs/practices/naming/"
79
return new prom_client.Counter(name, help, labels)
80
81
exports.new_gauge = new_gauge = (name, help, labels) ->
82
# a prometheus gauge -- https://github.com/siimon/prom-client#gauge
83
# basically, use it like gauge.labels(labelA, labelB).set(value)
84
return new prom_client.Gauge(name, help, labels)
85
86
exports.new_quantile = new_quantile = (name, help, config={}) ->
87
# invoked as quantile.observe(value)
88
config = defaults config,
89
# a few more than the default, in particular including the actual min and max
90
percentiles: [0.0, 0.01, 0.1, 0.25, 0.5, 0.75, 0.9, 0.99, 0.999, 1.0]
91
labels : []
92
return new prom_client.Summary(name, help, config.labels, percentiles: config.percentiles)
93
exports.new_summary = new_summary = new_quantile
94
95
exports.new_histogram = new_histogram = (name, help, config={}) ->
96
# invoked as histogram.observe(value)
97
config = defaults config,
98
buckets: [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]
99
labels: []
100
return new prom_client.Histogram(name, help, config.labels, buckets:config.buckets)
101
102
class MetricsRecorder
103
constructor: (@dbg, cb) ->
104
###
105
* @dbg: reporting via winston, instance with configuration passed in from hub.coffee
106
###
107
# stores the current state of the statistics
108
@_stats = {}
109
@_types = {} # key → TYPE.T mapping
110
111
# the full statistic
112
@_data = {}
113
@_collectors = []
114
115
# initialization finished
116
@setup_monitoring()
117
cb?(undefined, @)
118
119
metrics: =>
120
###
121
get a serialized representation of the metrics status
122
(was a dict that should be JSON, now it is for prometheus)
123
it's only called by hub_http_server for the /metrics endpoint
124
###
125
return prom_client.register.metrics()
126
127
register_collector: (collector) =>
128
# The added collector functions will be evaluated periodically to gather metrics
129
@_collectors.push(collector)
130
131
setup_monitoring: =>
132
# setup monitoring of some components
133
# called by the hub *after* setting up the DB, etc.
134
num_clients_gauge = new_gauge('clients_count', 'Number of connected clients')
135
{number_of_clients} = require('./hub_register')
136
@register_collector ->
137
try
138
num_clients_gauge.set(number_of_clients())
139
catch
140
num_clients_gauge.set(0)
141
142
# our own CPU metrics monitor, separating user and sys!
143
# it's actually a counter, since it is non-decreasing, but we'll use .set(...)
144
@_cpu_seconds_total = new_gauge('process_cpu_categorized_seconds_total', 'Total number of CPU seconds used', ['type'])
145
146
# init periodically calling @_collect
147
setTimeout((=> setInterval(@_collect, FREQ_s * 1000)), DELAY_s * 1000)
148
149
_collect: =>
150
# called by @_update to evaluate the collector functions
151
#@dbg('_collect called')
152
for c in @_collectors
153
c()
154
# linux specific: collecting this process and all its children sys+user times
155
# http://man7.org/linux/man-pages/man5/proc.5.html
156
fs.readFile path.join('/proc', ''+process.pid, 'stat'), 'utf8', (err, infos) =>
157
if err or not CLK_TCK?
158
@dbg("_collect err: #{err}")
159
return
160
# there might be spaces in the process name, hence split after the closing bracket!
161
infos = infos[infos.lastIndexOf(')') + 2...].split(' ')
162
@_cpu_seconds_total.labels('user') .set(parseFloat(infos[11]) / CLK_TCK)
163
@_cpu_seconds_total.labels('system') .set(parseFloat(infos[12]) / CLK_TCK)
164
# time spent waiting on child processes
165
@_cpu_seconds_total.labels('chld_user') .set(parseFloat(infos[13]) / CLK_TCK)
166
@_cpu_seconds_total.labels('chld_system').set(parseFloat(infos[14]) / CLK_TCK)
167
168
169
170
# some of the commented code below might be used in the future when periodically collecting data (e.g. sliding max of "concurrent" value)
171
###
172
# every FREQ_s the _data dict is being updated
173
# e.g current value, exp decay, later on also "intelligent" min/max, etc.
174
_update: ->
175
@_collect()
176
177
smooth = (new_value, arr) ->
178
arr ?= []
179
arr[0] = new_value
180
# compute smoothed value `sval` for each decay param
181
for d, idx in DECAY
182
sval = arr[idx + 1] ? new_value
183
sval = d * new_value + (1-d) * sval
184
arr[idx + 1] = sval
185
return arr
186
187
for key, values of @_stats
188
# if no new value is available, we have to create one for smoothing
189
if not values?.length > 0
190
# fallback to last, unless discrete
191
if @_types[key] != TYPE.DISC
192
[..., value] = @_data[key]
193
# in case _data[key] is empty, abort
194
if not value?
195
continue
196
# sum is special case, because of sum/FREQ_s below
197
if @_types[key] == TYPE.SUM
198
value *= FREQ_s
199
# one-element array
200
values = [value]
201
else
202
values = []
203
204
# computing the updated value for the @_data entries
205
switch @_types[key]
206
when TYPE.MAX
207
@_data[key] = smooth(values[0], @_data[key])
208
209
when TYPE.CONT
210
# compute the average value (TODO median?)
211
sum = underscore.reduce(values, ((a, b) -> a+b), 0)
212
avg = sum / values.length
213
@_data[key] = smooth(avg, @_data[key])
214
215
when TYPE.SUM
216
# compute the cumulative sum per second (e.g. database modifications)
217
sum = underscore.reduce(values, ((a, b) -> a+b), 0)
218
sum /= FREQ_s # to get a per 1s value!
219
@_data[key] = smooth(sum, @_data[key])
220
221
when TYPE.DISC
222
# this is a pair [timestamp, discrete value], appended to the data queue
223
queue = @_data[key] ? []
224
@_data[key] = [queue..., values...][-DISC_LEN..]
225
226
when TYPE.LAST
227
if values?.length > 0
228
# ... just store the most recent one
229
@_data[key] = values[0]
230
231
# we've consumed the value(s), reset them
232
@_stats[key] = []
233
234
record: (key, value, type = TYPE.CONT) =>
235
# store in @_stats a key → bounded array
236
if (@_types[key] ? type) != type
237
@dbg("WARNING: you are switching types from #{@_types[key]} to #{type} -- IGNORED")
238
return
239
@_types[key] = type
240
switch type
241
when TYPE.LAST
242
@_stats[key] = [value]
243
when TYPE.CONT, TYPE.SUM
244
arr = @_stats[key] ? []
245
@_stats[key] = [arr..., value]
246
when TYPE.MAX
247
current = @_stats[key] ? Number.NEGATIVE_INFINITY
248
@_stats[key] = [Math.max(value, current)]
249
when TYPE.DISC
250
ts = (new Date()).toISOString()
251
arr = @_stats[key] ? []
252
@_stats[key] = [arr..., [ts, value]]
253
else
254
@dbg?('hub/record_stats: unknown or undefined type #{type}')
255
# avoid overflows
256
@_stats[key] = @_stats[key][-MAX_BUFFER..]
257
###
258
259
metricsRecorder = null
260
exports.init = (winston, cb) ->
261
dbg = (msg) ->
262
winston.info("MetricsRecorder: #{msg}")
263
metricsRecorder = new MetricsRecorder(dbg, cb)
264
265
exports.get = ->
266
return metricsRecorder
267
268