Contact
CoCalc Logo Icon
StoreFeaturesDocsShareSupport News AboutSign UpSign In
| Download
Views: 39539
1
##############################################################################
2
#
3
# CoCalc: Collaborative Calculation in the Cloud
4
#
5
# Copyright (C) 2016 -- 2017, Sagemath Inc.
6
#
7
# This program is free software: you can redistribute it and/or modify
8
# it under the terms of the GNU Affero General Public License as
9
# published by the Free Software Foundation, either version 3 of the
10
# License, or (at your option) any later version.
11
#
12
# This program is distributed in the hope that it will be useful,
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
# GNU Affero General Public License for more details.
16
#
17
# You should have received a copy of the GNU Affero General Public License
18
# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
#
20
###############################################################################
21
22
###
23
PostgreSQL -- basic queries and database interface
24
###
25
26
exports.DEBUG = true
27
28
QUERY_ALERT_THRESH_MS=5000
29
30
EventEmitter = require('events')
31
32
fs = require('fs')
33
async = require('async')
34
35
pg = require('pg').native # You might have to do: "apt-get install libpq5 libpq-dev"
36
if not pg?
37
throw Error("YOU MUST INSTALL the pg-native npm module")
38
# You can uncommment this to use the pure javascript driver.
39
# However: (1) it can be 5x slower or more!
40
# (2) I think it corrupts something somehow in a subtle way, since our whole
41
# syncstring system was breaking... until I switched to native. Not sure.
42
#pg = require('pg')
43
44
45
46
winston = require('winston')
47
winston.remove(winston.transports.Console)
48
winston.add(winston.transports.Console, {level: 'debug', timestamp:true, colorize:true})
49
50
misc_node = require('smc-util-node/misc_node')
51
52
{defaults} = misc = require('smc-util/misc')
53
required = defaults.required
54
55
{SCHEMA, client_db} = require('smc-util/schema')
56
57
read_password_from_disk = ->
58
filename = (process.env.SMC_ROOT ? '.') + '/data/secrets/postgres'
59
try
60
#winston.debug("Loading password from '#{filename}'")
61
return fs.readFileSync(filename).toString().trim()
62
catch
63
#winston.debug("NO PASSWORD FILE!")
64
# no password file
65
return
66
67
class exports.PostgreSQL extends EventEmitter # emits a 'connect' event whenever we successfully connect to the database.
68
constructor: (opts) ->
69
opts = defaults opts,
70
host : process.env['PGHOST'] ? 'localhost' # or 'hostname:port'
71
database : process.env['SMC_DB'] ? 'smc'
72
user : process.env['PGUSER'] ? 'smc'
73
debug : exports.DEBUG
74
connect : true
75
password : undefined
76
pool : undefined # IGNORED for now.
77
cache_expiry : 3000 # expire cached queries after this many milliseconds
78
# keep this very short; it's just meant to reduce impact of a bunch of
79
# identical permission checks in a single user query.
80
cache_size : 100 # cache this many queries; use @_query(cache:true, ...) to cache result
81
concurrent_warn : 500
82
ensure_exists : true # ensure database exists on startup (runs psql in a shell)
83
@setMaxListeners(10000) # because of a potentially large number of changefeeds
84
@_state = 'init'
85
@_debug = opts.debug
86
@_ensure_exists = opts.ensure_exists
87
dbg = @_dbg("constructor") # must be after setting @_debug above
88
dbg(opts)
89
i = opts.host.indexOf(':')
90
if i != -1
91
@_host = opts.host.slice(0, i)
92
@_port = parseInt(opts.host.slice(i+1))
93
else
94
@_host = opts.host
95
@_port = 5432
96
@_concurrent_warn = opts.concurrent_warn
97
@_user = opts.user
98
@_database = opts.database
99
@_concurrent_queries = 0
100
@_password = opts.password ? read_password_from_disk()
101
@_init_metrics()
102
103
if opts.cache_expiry and opts.cache_size
104
@_query_cache = (new require('expiring-lru-cache'))(size:opts.cache_size, expiry: opts.cache_expiry)
105
if opts.connect
106
@connect() # start trying to connect
107
108
clear_cache: =>
109
@_query_cache?.reset()
110
111
close: =>
112
if @_state == 'closed'
113
return # nothing to do
114
@_state = 'closed'
115
@emit('close')
116
@removeAllListeners()
117
if @_client?
118
@_client.removeAllListeners()
119
@_client.end()
120
delete @_client
121
122
engine: -> 'postgresql'
123
124
connect: (opts) =>
125
opts = defaults opts,
126
max_time : undefined # set to something shorter to not try forever
127
# Only first max_time is used.
128
cb : undefined
129
if @_state == 'closed'
130
opts.cb?("closed")
131
return
132
dbg = @_dbg("connect")
133
if @_client?
134
dbg("already connected")
135
opts.cb?()
136
return
137
if @_connecting?
138
dbg('already trying to connect')
139
@_connecting.push(opts.cb)
140
return
141
dbg('will try to connect')
142
@_state = 'init'
143
if opts.max_time
144
dbg("for up to #{opts.max_time}ms")
145
else
146
dbg("until successful")
147
@_connecting = [opts.cb]
148
misc.retry_until_success
149
f : @_connect
150
max_delay : 10000
151
max_time : opts.max_time
152
start_delay : 500 + 500*Math.random()
153
log : dbg
154
cb : (err) =>
155
v = @_connecting
156
delete @_connecting
157
for cb in v
158
cb?(err)
159
if not err
160
@_state = 'connected'
161
@emit('connect')
162
163
disconnect: () =>
164
@_client?.end()
165
delete @_client
166
167
_connect: (cb) =>
168
dbg = @_dbg("_do_connect"); dbg()
169
@_clear_listening_state() # definitely not listening
170
if @_client?
171
@_client.end()
172
delete @_client
173
client = undefined
174
async.series([
175
(cb) =>
176
@_concurrent_queries = 0
177
if @_ensure_exists
178
dbg("first make sure db exists")
179
@_ensure_database_exists(cb)
180
else
181
dbg("assuming database exists")
182
cb()
183
(cb) =>
184
dbg("create client and start connecting...")
185
client = new pg.Client
186
user : 'smc'
187
host : if @_host then @_host # undefined if @_host=''
188
port : @_port
189
password : @_password
190
database : @_database
191
if @_notification?
192
client.on('notification', @_notification)
193
client.on 'error', (err) =>
194
dbg("error -- #{err}")
195
client?.end()
196
delete @_client
197
@connect() # start trying to reconnect
198
client.connect(cb)
199
(cb) =>
200
# CRITICAL! At scale, this query
201
# SELECT * FROM file_use WHERE project_id = any(select project_id from projects where users ? '25e2cae4-05c7-4c28-ae22-1e6d3d2e8bb3') ORDER BY last_edited DESC limit 100;
202
# will take forever due to the query planner using a nestloop scan. We thus
203
# disable doing so!
204
dbg("now connected; disabling nestloop query planning.")
205
client.query("SET enable_nestloop TO off", cb)
206
], (err) =>
207
if err
208
mesg = "Failed to connect to database -- #{err}"
209
dbg(mesg)
210
console.warn(mesg) # make it clear for interactive users with debugging off -- common mistake with env not setup right.
211
cb?(err)
212
else
213
@_client = client
214
dbg("connected!")
215
cb?(undefined, @)
216
)
217
218
_dbg: (f) =>
219
if @_debug
220
return (m) => winston.debug("PostgreSQL.#{f}: #{misc.trunc_middle(JSON.stringify(m), 1000)}")
221
else
222
return ->
223
224
_init_metrics: =>
225
# initialize metrics
226
MetricsRecorder = require('./metrics-recorder')
227
@query_time_quantile = MetricsRecorder.new_quantile('db_query_ms_quantile', 'db queries',
228
percentiles : [0, 0.25, 0.5, 0.75, 0.9, 0.99, 1]
229
labels: ['table']
230
)
231
@query_time_histogram = MetricsRecorder.new_histogram('db_query_ms_histogram', 'db queries'
232
buckets : [1, 5, 10, 20, 50, 100, 200, 500, 1000, 5000, 10000]
233
labels: ['table']
234
)
235
@concurrent_counter = MetricsRecorder.new_counter('db_concurrent_total',
236
'Concurrent queries (started and finished)',
237
['state']
238
)
239
240
_query: (opts) =>
241
opts = defaults opts,
242
query : undefined # can give select and table instead
243
select : undefined # if given, should be string or array of column names -| can give these
244
table : undefined # if given, name of table -| two instead of query
245
params : []
246
cache : false # Will cache results for a few seconds or use cache. Use this
247
# when speed is very important, and results that are a few seconds
248
# out of date are fine.
249
where : undefined # Used for SELECT: If given, can be
250
# - a map with keys clauses with $::TYPE (not $1::TYPE!) and values
251
# the corresponding params. Also, WHERE must not be in the query already.
252
# If where[cond] is undefined, then cond is completely **ignored**.
253
# - a string, which is inserted as is as a normal WHERE condition.
254
# - an array of maps or strings.
255
set : undefined # Appends a SET clause to the query; same format as values.
256
values : undefined # Used for INSERT: If given, then params and where must not be given. Values is a map
257
# {'field1::type1':value, , 'field2::type2':value2, ...} which gets converted to
258
# ' (field1, field2, ...) VALUES ($1::type1, $2::type2, ...) '
259
# with corresponding params set. Undefined valued fields are ignored and types may be omited.
260
conflict : undefined # If given, then values must also be given; appends this to query:
261
# ON CONFLICT (name) DO UPDATE SET value=EXCLUDED.value'
262
# Or, if conflict starts with "ON CONFLICT", then just include as is, e.g.,
263
# "ON CONFLICT DO NOTHING"
264
jsonb_set : undefined # Used for setting a field that contains a JSONB javascript map.
265
# Give as input an object
266
#
267
# { field1:{key1:val1, key2:val2, ...}, field2:{key3:val3,...}, ...}
268
#
269
# In each field, every key has the corresponding value set, unless val is undefined/null, in which
270
# case that key is deleted from the JSONB object fieldi. Simple as that! This is much, much
271
# cleaner to use than SQL. Also, if the value in fieldi itself is NULL, it gets
272
# created automatically.
273
jsonb_merge : undefined # Exactly like jsonb_set, but when val1 (say) is an object, it merges that object in,
274
# *instead of* setting field1[key1]=val1. So after this field1[key1] has what was in it
275
# and also what is in val1. Obviously field1[key1] had better have been an array or NULL.
276
order_by : undefined
277
limit : undefined
278
safety_check: true
279
cb : undefined
280
281
if not @_client?
282
dbg = @_dbg("_query")
283
dbg("connecting first...")
284
@connect
285
max_time : 45000 # don't try forever; queries could pile up.
286
cb : (err) =>
287
if err
288
dbg("FAILED to connect -- #{err}")
289
opts.cb("database is down (please try later)")
290
else
291
dbg("connected, now doing query")
292
@__do_query(opts)
293
else
294
@__do_query(opts)
295
296
__do_query: (opts) =>
297
dbg = @_dbg("_query('#{opts.query}')")
298
if not @_client?
299
# TODO: should also check that client is connected.
300
opts.cb?("client not yet initialized")
301
return
302
if opts.params? and not misc.is_array(opts.params)
303
opts.cb?("params must be an array")
304
return
305
if not opts.query?
306
if not opts.table?
307
opts.cb?("if query not given, then table must be given")
308
return
309
if not opts.select?
310
opts.select = '*'
311
if misc.is_array(opts.select)
312
opts.select = (quote_field(field) for field in opts.select).join(',')
313
opts.query = "SELECT #{opts.select} FROM \"#{opts.table}\""
314
delete opts.select
315
316
push_param = (param, type) ->
317
if type?.toUpperCase() == 'JSONB'
318
param = misc.to_json(param) # I don't understand why this is needed by the driver....
319
opts.params.push(param)
320
return opts.params.length
321
322
if opts.jsonb_merge?
323
if opts.jsonb_set?
324
opts.cb?("if jsonb_merge is set then jsonb_set must not be set")
325
return
326
opts.jsonb_set = opts.jsonb_merge
327
328
SET = []
329
if opts.jsonb_set?
330
# This little piece of very hard to write (and clever?) code
331
# makes it so we can set or **merge in at any nested level** (!)
332
# arbitrary JSON objects. We can also delete any key at any
333
# level by making the value null or undefined! This is amazingly
334
# easy to use in queries -- basically making JSONP with postgres
335
# as expressive as RethinkDB REQL (even better in some ways).
336
set = (field, data, path) =>
337
obj = "COALESCE(#{field}#>'{#{path.join(',')}}', '{}'::JSONB)"
338
for key, val of data
339
if not val?
340
# remove key from object
341
obj = "(#{obj} - '#{key}')"
342
else
343
if opts.jsonb_merge? and (typeof(val) == 'object' and not misc.is_date(val))
344
subobj = set(field, val, path.concat([key]))
345
obj = "JSONB_SET(#{obj}, '{#{key}}', #{subobj})"
346
else
347
# completely replace field[key] with val.
348
obj = "JSONB_SET(#{obj}, '{#{key}}', $#{push_param(val, 'JSONB')}::JSONB)"
349
return obj
350
v = ("#{field}=#{set(field, data, [])}" for field, data of opts.jsonb_set)
351
SET.push(v...)
352
353
if opts.values?
354
#dbg("values = #{misc.to_json(opts.values)}")
355
if opts.where?
356
opts.cb?("where must not be defined if opts.values is defined")
357
return
358
359
if misc.is_array(opts.values)
360
# An array of numerous separate object that we will insert all at once.
361
# Determine the fields, which as the union of the keys of all values.
362
fields = {}
363
for x in opts.values
364
if not misc.is_object(x)
365
opts.cb?("if values is an array, every entry must be an object")
366
return
367
for k, p of x
368
fields[k] = true
369
# convert to array
370
fields = misc.keys(fields)
371
fields_to_index = {}
372
n = 0
373
for field in fields
374
fields_to_index[field] = n
375
n += 1
376
values = []
377
for x in opts.values
378
value = []
379
for field, param of x
380
if field.indexOf('::') != -1
381
[field, type] = field.split('::')
382
type = type.trim()
383
y = "$#{push_param(param, type)}::#{type}"
384
else
385
y = "$#{push_param(param)}"
386
value[fields_to_index[field]] = y
387
values.push(value)
388
else
389
# A single entry that we'll insert.
390
391
fields = []
392
values = []
393
for field, param of opts.values
394
if not param? # ignore undefined fields -- makes code cleaner (and makes sense)
395
continue
396
if field.indexOf('::') != -1
397
[field, type] = field.split('::')
398
fields.push(quote_field(field.trim()))
399
type = type.trim()
400
values.push("$#{push_param(param, type)}::#{type}")
401
continue
402
else
403
fields.push(quote_field(field))
404
values.push("$#{push_param(param)}")
405
values = [values] # just one
406
407
if values.length > 0
408
opts.query += " (#{(quote_field(field) for field in fields).join(',')}) VALUES " + (" (#{value.join(',')}) " for value in values).join(',')
409
410
if opts.set?
411
v = []
412
for field, param of opts.set
413
if field.indexOf('::') != -1
414
[field, type] = field.split('::')
415
type = type.trim()
416
v.push("#{quote_field(field.trim())}=$#{push_param(param, type)}::#{type}")
417
continue
418
else
419
v.push("#{quote_field(field.trim())}=$#{push_param(param)}")
420
if v.length > 0
421
SET.push(v...)
422
423
if opts.conflict?
424
if misc.is_string(opts.conflict) and misc.startswith(opts.conflict.toLowerCase().trim(), 'on conflict')
425
# Straight string inclusion
426
opts.query += ' ' + opts.conflict + ' '
427
else
428
if not opts.values?
429
opts.cb?("if conflict is specified then values must also be specified")
430
return
431
if not misc.is_array(opts.conflict)
432
if typeof(opts.conflict) != 'string'
433
opts.cb?("conflict (='#{misc.to_json(opts.conflict)}') must be a string (the field name), for now")
434
return
435
else
436
conflict = [opts.conflict]
437
else
438
conflict = opts.conflict
439
v = ("#{quote_field(field)}=EXCLUDED.#{field}" for field in fields when field not in conflict)
440
SET.push(v...)
441
if SET.length == 0
442
opts.query += " ON CONFLICT (#{conflict.join(',')}) DO NOTHING "
443
else
444
opts.query += " ON CONFLICT (#{conflict.join(',')}) DO UPDATE "
445
446
if SET.length > 0
447
opts.query += " SET " + SET.join(' , ')
448
449
WHERE = []
450
push_where = (x) =>
451
if typeof(x) == 'string'
452
WHERE.push(x)
453
else if misc.is_array(x)
454
for v in x
455
push_where(v)
456
else if misc.is_object(x)
457
for cond, param of x
458
if typeof(cond) != 'string'
459
opts.cb?("each condition must be a string but '#{cond}' isn't")
460
return
461
if not param? # *IGNORE* where conditions where value is explicitly undefined
462
continue
463
if cond.indexOf('$') == -1
464
# where condition is missing it's $ parameter -- default to equality
465
cond += " = $"
466
WHERE.push(cond.replace('$', "$#{push_param(param)}"))
467
468
if opts.where?
469
push_where(opts.where)
470
471
if WHERE.length > 0
472
if opts.values?
473
opts.cb?("values must not be given if where clause given")
474
return
475
opts.query += " WHERE #{WHERE.join(' AND ')}"
476
477
if opts.order_by?
478
opts.query += " ORDER BY #{opts.order_by} "
479
480
if opts.limit?
481
opts.query += " LIMIT #{opts.limit} "
482
483
dbg("query='#{opts.query}'")
484
485
if opts.safety_check
486
safety_check = opts.query.toLowerCase()
487
if (safety_check.indexOf('update') != -1 or safety_check.indexOf('delete') != -1) and (safety_check.indexOf('where') == -1 and safety_check.indexOf('trigger') == -1 and safety_check.indexOf('insert') == -1 and safety_check.indexOf('create') == -1)
488
# This is always a bug.
489
err = "ERROR -- Dangerous UPDATE or DELETE without a WHERE, TRIGGER, or INSERT: query='#{opts.query}'"
490
dbg(err)
491
opts.cb?(err)
492
return
493
494
if opts.cache and @_query_cache?
495
# check for cached result
496
full_query_string = JSON.stringify([opts.query, opts.params])
497
if (x = @_query_cache.get(full_query_string))?
498
dbg("using cache for '#{opts.query}'")
499
opts.cb?(x...)
500
return
501
502
# params can easily be huge, e.g., a blob. But this may be
503
# needed at some point for debugging.
504
#dbg("query='#{opts.query}', params=#{misc.to_json(opts.params)}")
505
506
@_concurrent_queries += 1
507
@concurrent_counter.labels('started').inc(1)
508
try
509
start = new Date()
510
@_client.query opts.query, opts.params, (err, result) =>
511
query_time_ms = new Date() - start
512
@_concurrent_queries -= 1
513
@query_time_quantile.observe({table:opts.table ? ''}, query_time_ms)
514
@query_time_histogram.observe({table:opts.table ? ''}, query_time_ms)
515
@concurrent_counter.labels('ended').inc(1)
516
if err
517
dbg("done (concurrent=#{@_concurrent_queries}), (query_time_ms=#{query_time_ms}) -- error: #{err}")
518
err = 'postgresql ' + err
519
else
520
dbg("done (concurrent=#{@_concurrent_queries}) (query_time_ms=#{query_time_ms}) -- success")
521
if opts.cache and @_query_cache?
522
@_query_cache.set(full_query_string, [err, result])
523
opts.cb?(err, result)
524
if query_time_ms >= QUERY_ALERT_THRESH_MS
525
dbg("QUERY_ALERT_THRESH: query_time_ms=#{query_time_ms}\nQUERY_ALERT_THRESH: query='#{opts.query}'\nQUERY_ALERT_THRESH: params='#{misc.to_json(opts.params)}'")
526
catch e
527
# this should never ever happen
528
dbg("EXCEPTION in @_client.query: #{e}")
529
opts.cb?(e)
530
@_concurrent_queries -= 1
531
@concurrent_counter.labels('ended').inc(1)
532
return
533
534
# Special case of query for counting entries in a table.
535
_count: (opts) =>
536
opts = defaults opts,
537
table : required
538
where : undefined # as in _query
539
cb : required
540
@_query
541
query : "SELECT COUNT(*) FROM #{opts.table}"
542
where : opts.where
543
cb : count_result(opts.cb)
544
545
_validate_opts: (opts) =>
546
for k, v of opts
547
if k.slice(k.length-2) == 'id'
548
if v? and not misc.is_valid_uuid_string(v)
549
opts.cb?("invalid #{k} -- #{v}")
550
return false
551
if k.slice(k.length-3) == 'ids'
552
for w in v
553
if not misc.is_valid_uuid_string(w)
554
opts.cb?("invalid uuid #{w} in #{k} -- #{misc.to_json(v)}")
555
return false
556
if k == 'group' and v not in misc.PROJECT_GROUPS
557
opts.cb?("unknown project group '#{v}'"); return false
558
if k == 'groups'
559
for w in v
560
if w not in misc.PROJECT_GROUPS
561
opts.cb?("unknown project group '#{w}' in groups"); return false
562
return true
563
564
_ensure_database_exists: (cb) =>
565
dbg = @_dbg("_ensure_database_exists")
566
dbg("ensure database '#{@_database}' exists")
567
args = ['--user', @_user, '--host', @_host, '--port', @_port, '--list', '--tuples-only']
568
dbg("psql #{args.join(' ')}")
569
misc_node.execute_code
570
command : 'psql'
571
args : args
572
env :
573
PGPASSWORD : @_password
574
cb : (err, output) =>
575
if err
576
cb(err)
577
return
578
databases = (x.split('|')[0].trim() for x in output.stdout.split('\n') when x)
579
if @_database in databases
580
dbg("database '#{@_database}' already exists")
581
cb()
582
return
583
dbg("creating database '#{@_database}'")
584
misc_node.execute_code
585
command : 'createdb'
586
args : ['--host', @_host, '--port', @_port, @_database]
587
cb : cb
588
589
_confirm_delete: (opts) =>
590
opts = defaults opts,
591
confirm : 'no'
592
cb : required
593
dbg = @_dbg("confirm")
594
if opts.confirm != 'yes'
595
err = "Really delete all data? -- you must explicitly pass in confirm='yes' (but confirm:'#{opts.confirm}')"
596
dbg(err)
597
opts.cb(err)
598
return false
599
else
600
return true
601
602
set_random_password: (opts) =>
603
throw Error("NotImplementedError")
604
605
# This will fail if any other clients have db open.
606
# This function is very important for automated testing.
607
delete_entire_database: (opts) =>
608
dbg = @_dbg("delete_entire_database")
609
dbg("deleting database '#{@_database}'")
610
if not @_confirm_delete(opts)
611
dbg("failed confirmation")
612
return
613
async.series([
614
(cb) =>
615
dbg("disconnect from db")
616
@_client.end(cb)
617
(cb) =>
618
misc_node.execute_code
619
command : 'dropdb'
620
args : ['--host', @_host, '--port', @_port, @_database]
621
cb : cb
622
], opts.cb)
623
624
# Deletes all the contents of the tables in the database. It doesn't
625
# delete anything about the schema itself: indexes or tables.
626
delete_all: (opts) =>
627
dbg = @_dbg("delete_all")
628
dbg("deleting all contents of tables in '#{@_database}'")
629
if not @_confirm_delete(opts)
630
return
631
632
# If the cache is enabled, be sure to also clear it.
633
@clear_cache()
634
635
tables = undefined
636
637
# Delete anything cached in the db object. Obviously, not putting something here
638
# is a natural place in which to cause bugs... but they will probably all be bugs
639
# of the form "the test suite fails", so we'll find them.
640
delete @_stats_cached
641
642
# Actually delete tables
643
async.series([
644
(cb) =>
645
@_get_tables (err, t) =>
646
tables = t; cb(err)
647
(cb) =>
648
f = (table, cb) =>
649
@_query
650
query : "DELETE FROM #{table}"
651
safety_check : false
652
cb : cb
653
async.map(tables, f, cb)
654
], opts.cb)
655
656
# return list of tables in the database
657
_get_tables: (cb) =>
658
@_query
659
query : "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'"
660
cb : (err, result) =>
661
if err
662
cb(err)
663
else
664
cb(undefined, (row.table_name for row in result.rows))
665
666
# Return list of columns in a given table
667
_get_columns: (table, cb) =>
668
@_query
669
query : "SELECT column_name FROM information_schema.columns"
670
where :
671
"table_name = $::text" : table
672
cb : (err, result) =>
673
if err
674
cb(err)
675
else
676
cb(undefined, (row.column_name for row in result.rows))
677
678
_primary_keys: (table) =>
679
return client_db.primary_keys(table)
680
681
# Return *the* primary key, assuming unique; otherwise raise an exception.
682
_primary_key: (table) =>
683
v = @_primary_keys(table)
684
if v.length != 1
685
throw Error("compound primary key tables not yet supported")
686
return v[0]
687
688
_create_table: (table, cb) =>
689
dbg = @_dbg("_create_table('#{table}')")
690
dbg()
691
schema = SCHEMA[table]
692
if not schema?
693
cb("no table '#{table}' in schema")
694
return
695
if schema.virtual
696
cb("table '#{table}' is virtual")
697
return
698
columns = []
699
try
700
primary_keys = @_primary_keys(table)
701
catch e
702
cb(e)
703
return
704
for column, info of schema.fields
705
s = "#{quote_field(column)} #{pg_type(info)}"
706
if info.unique
707
s += " UNIQUE"
708
if info.pg_check
709
s += " " + info.pg_check
710
columns.push(s)
711
async.series([
712
(cb) =>
713
dbg("creating the table")
714
@_query
715
query : "CREATE TABLE #{table} (#{columns.join(', ')}, PRIMARY KEY(#{primary_keys.join(', ')}))"
716
cb : cb
717
(cb) =>
718
@_create_indexes(table, cb)
719
], cb)
720
721
_create_indexes_queries: (table) =>
722
schema = SCHEMA[table]
723
v = schema.pg_indexes ? []
724
if schema.fields.expire? and 'expire' not in v
725
v.push('expire')
726
queries = []
727
for query in v
728
name = "#{table}_#{misc.make_valid_name(query)}_idx" # this first, then...
729
if query.indexOf('(') == -1 # do this **after** making name.
730
query = "(#{query})"
731
queries.push({name:name, query:query})
732
return queries
733
734
_create_indexes: (table, cb) =>
735
dbg = @_dbg("_create_indexes('#{table}')")
736
dbg()
737
pg_indexes = @_create_indexes_queries(table)
738
if pg_indexes.length == 0
739
dbg("no indexes")
740
cb()
741
return
742
dbg("creating indexes")
743
f = (info, cb) =>
744
query = info.query
745
# Shorthand index is just the part in parens.
746
query = "CREATE INDEX #{info.name} ON #{table} #{query}"
747
@_query
748
query : query
749
cb : cb
750
async.map(pg_indexes, f, cb)
751
752
# Ensure that for the given table, the actual schema and indexes
753
# in the database matches the one defined in SCHEMA.
754
_update_table_schema: (table, cb) =>
755
dbg = @_dbg("_update_table_schema('#{table}')")
756
dbg()
757
schema = SCHEMA[table]
758
if not schema? # some auxiliary table in the database not in our schema -- leave it alone!
759
cb()
760
return
761
if schema.virtual
762
cb("table '#{table}' is virtual")
763
return
764
async.series([
765
(cb) => @_update_table_schema_columns(table, cb)
766
(cb) => @_update_table_schema_indexes(table, cb)
767
], cb)
768
769
_update_table_schema_columns: (table, cb) =>
770
dbg = @_dbg("_update_table_schema_columns('#{table}')")
771
dbg()
772
schema = SCHEMA[table]
773
columns = {}
774
async.series([
775
(cb) =>
776
@_query
777
query : "select column_name, data_type, character_maximum_length from information_schema.columns"
778
where : {table_name: table}
779
cb : all_results (err, x) =>
780
if err
781
cb(err)
782
else
783
for y in x
784
if y.character_maximum_length
785
columns[y.column_name] = "varchar(#{y.character_maximum_length})"
786
else
787
columns[y.column_name] = y.data_type
788
cb()
789
(cb) =>
790
# Note: changing column ordering is NOT supported in PostgreSQL, so
791
# it's critical to not depend on it!
792
# https://wiki.postgresql.org/wiki/Alter_column_position
793
tasks = []
794
for column, info of schema.fields
795
cur_type = columns[column]?.toLowerCase()
796
if cur_type?
797
cur_type = cur_type.split(' ')[0]
798
try
799
goal_type = pg_type(info).toLowerCase()
800
catch err
801
cb(err)
802
return
803
goal_type = goal_type.split(' ')[0]
804
if goal_type.slice(0,4) == 'char'
805
# we do NOT support changing between fixed length and variable length strength
806
goal_type = 'var' + goal_type
807
if not cur_type?
808
# column is in our schema, but not in the actual database
809
dbg("will add column '#{column}' to the database")
810
tasks.push({action:'add', column:column})
811
else if cur_type != goal_type
812
if goal_type.indexOf('[]') != -1
813
# NO support for array schema changes (even detecting)!
814
continue
815
dbg("type difference for column '#{column}' -- cur='#{cur_type}' versus goal='#{goal_type}'")
816
tasks.push({action:'alter', column:column})
817
# Note: we do not delete columns in the database, since that's scary
818
# and they don't cause any harm (except wasting space).
819
820
if tasks.length == 0
821
dbg("no changes to table are needed")
822
cb()
823
return
824
dbg("#{tasks.length} columns will be altered")
825
do_task = (task, cb) =>
826
info = schema.fields[task.column]
827
col = quote_field(task.column)
828
try
829
type = pg_type(info)
830
catch err
831
cb(err)
832
return
833
desc = type
834
if info.unique
835
desc += " UNIQUE"
836
if info.pg_check
837
desc += " " + info.pg_check
838
switch task.action
839
when 'alter'
840
@_query
841
query : "ALTER TABLE #{quote_field(table)} ALTER COLUMN #{col} TYPE #{desc} USING #{col}::#{type}"
842
cb : cb
843
when 'add'
844
@_query
845
query : "ALTER TABLE #{quote_field(table)} ADD COLUMN #{col} #{desc}"
846
cb : cb
847
else
848
cb("unknown action '#{task.action}")
849
async.mapSeries(tasks, do_task, cb)
850
], (err) -> cb?(err))
851
852
_update_table_schema_indexes: (table, cb) =>
853
dbg = @_dbg("_update_table_schema_indexes('#{table}')")
854
cur_indexes = {}
855
async.series([
856
(cb) =>
857
dbg("getting list of existing indexes")
858
@_query # See http://www.alberton.info/postgresql_meta_info.html#.WIfyPLYrLdQ
859
query : "SELECT c.relname AS name FROM pg_class AS a JOIN pg_index AS b ON (a.oid = b.indrelid) JOIN pg_class AS c ON (c.oid = b.indexrelid)"
860
where : {'a.relname': table}
861
cb : all_results 'name', (err, x) =>
862
if err
863
cb(err)
864
else
865
for name in x
866
cur_indexes[name] = true
867
cb()
868
(cb) =>
869
# these are the indexes we are supposed to have
870
goal_indexes = @_create_indexes_queries(table)
871
goal_indexes_map = {}
872
tasks = []
873
for x in goal_indexes
874
goal_indexes_map[x.name] = true
875
if not cur_indexes[x.name]
876
tasks.push({action:'create', name:x.name, query:x.query})
877
for name of cur_indexes
878
# only delete indexes that end with _idx; don't want to delete, e.g., pkey primary key indexes.
879
if misc.endswith(name, '_idx') and not goal_indexes_map[name]
880
tasks.push({action:'delete', name:name})
881
do_task = (task, cb) =>
882
switch task.action
883
when 'create'
884
@_query
885
query : "CREATE INDEX #{task.name} ON #{table} #{task.query}"
886
cb : cb
887
when 'delete'
888
@_query
889
query : "DROP INDEX #{task.name}"
890
cb : cb
891
else
892
cb("unknown action '#{task.action}")
893
async.map(tasks, do_task, cb)
894
], (err) -> cb?(err))
895
896
_throttle: (name, time_s, key...) =>
897
key = misc.to_json(key)
898
x = "_throttle_#{name}"
899
@[x] ?= {}
900
if @[x][key]
901
return true
902
@[x][key] = true
903
setTimeout((()=>delete @[x][key]), time_s*1000)
904
return false
905
906
# Ensure that the actual schema in the database matches the one defined in SCHEMA.
907
# This creates the initial schema, adds new columns fine, and in a VERY LIMITED
908
# range of cases, *might be* be able to change the data type of a column.
909
update_schema: (opts) =>
910
opts = defaults opts,
911
cb : undefined
912
dbg = @_dbg("update_schema"); dbg()
913
914
psql_tables = goal_tables = created_tables = undefined
915
async.series([
916
(cb) =>
917
# Get a list of all tables that should exist
918
dbg("get tables")
919
@_get_tables (err, t) =>
920
psql_tables = t
921
dbg("psql_tables = #{misc.to_json(psql_tables)}")
922
goal_tables = (t for t,s of SCHEMA when t not in psql_tables and not s.virtual)
923
dbg("goal_tables = #{misc.to_json(goal_tables)}")
924
cb(err)
925
(cb) =>
926
# Create from scratch any missing tables -- usually this creates all tables and
927
# indexes the first time around.
928
to_create = (table for table in goal_tables when table not in psql_tables)
929
created_tables = {}
930
for table in to_create
931
created_tables[table] = true
932
if to_create.length == 0
933
dbg("there are no missing tables in psql")
934
cb()
935
return
936
dbg("creating #{to_create.length} missing tables")
937
async.map to_create, @_create_table, (err) =>
938
if err
939
dbg("error creating tables -- #{err}")
940
cb(err)
941
(cb) =>
942
# For each table that already exists, ensure that the columns are correct,
943
# have the correct type, and all indexes exist.
944
old_tables = (table for table in psql_tables when not created_tables[table])
945
if old_tables.length == 0
946
dbg("no old tables to update")
947
cb()
948
return
949
dbg("verifying schema and indexes for #{old_tables.length} existing tables")
950
async.map old_tables, @_update_table_schema, (err) =>
951
if err
952
dbg("error updating table schemas -- #{err}")
953
cb(err)
954
], (err) => opts.cb?(err))
955
956
# Return the number of outstanding concurrent queries.
957
concurrent: () =>
958
return @_concurrent_queries
959
960
# Compute the sha1 hash (in hex) of the input arguments, which are
961
# converted to strings (via json) if they are not strings, then concatenated.
962
# This is used for computing compound primary keys in a way that is relatively
963
# safe, and in situations where if there were a highly unlikely collision, it
964
# wouldn't be the end of the world. There is a similar client-only slower version
965
# of this function (in schema.coffee), so don't change it willy nilly.
966
sha1: (args...) ->
967
v = ((if typeof(x) == 'string' then x else JSON.stringify(x)) for x in args).join('')
968
return misc_node.sha1(v)
969
970
# Go through every table in the schema with a column called "expire", and
971
# delete every entry where expire is <= right now.
972
delete_expired: (opts) =>
973
opts = defaults opts,
974
count_only : false # if true, only count the number of rows that would be deleted
975
table : undefined # only delete from this table
976
cb : required
977
dbg = @_dbg("delete_expired(...)")
978
dbg()
979
f = (table, cb) =>
980
dbg("table='#{table}'")
981
if opts.count_only
982
@_query
983
query : "SELECT COUNT(*) FROM #{table} WHERE expire <= NOW()"
984
cb : (err, result) =>
985
if not err
986
dbg("COUNT for table #{table} is #{result.rows[0].count}")
987
cb(err)
988
else
989
dbg("deleting expired entries from '#{table}'")
990
@_query
991
query : "DELETE FROM #{table} WHERE expire <= NOW()"
992
cb : (err) =>
993
dbg("finished deleting expired entries from '#{table}' -- #{err}")
994
cb(err)
995
if opts.table
996
tables = [opts.table]
997
else
998
tables = (k for k, v of SCHEMA when v.fields?.expire? and not v.virtual)
999
async.map(tables, f, opts.cb)
1000
1001
# count number of entries in a table
1002
count: (opts) =>
1003
opts = defaults opts,
1004
table : required
1005
cb : required
1006
@_query
1007
query : "SELECT COUNT(*) FROM #{opts.table}"
1008
cb : count_result(opts.cb)
1009
1010
###
1011
Other misc functions
1012
###
1013
1014
# Convert from info in the schema table to a pg type
1015
# See https://www.postgresql.org/docs/devel/static/datatype.html
1016
# The returned type from this function is upper case!
1017
exports.pg_type = pg_type = (info) ->
1018
if not info? or typeof(info) == 'boolean'
1019
throw Error("pg_type: insufficient information to determine type (info=#{typeof(info)})")
1020
if info.pg_type?
1021
return info.pg_type
1022
if not info.type?
1023
throw Error("pg_type: insufficient information to determine type (pg_type and type both not given)")
1024
type = info.type.toLowerCase()
1025
switch type
1026
when 'uuid'
1027
return 'UUID'
1028
when 'timestamp'
1029
return 'TIMESTAMP'
1030
when 'string', 'text'
1031
return 'TEXT'
1032
when 'boolean'
1033
return 'BOOLEAN'
1034
when 'map'
1035
return 'JSONB'
1036
when 'integer'
1037
return 'INTEGER'
1038
when 'number', 'double', 'float'
1039
return 'DOUBLE PRECISION'
1040
when 'array'
1041
throw Error("pg_type: you must specify the array type explicitly (info=#{misc.to_json(info)})")
1042
when 'buffer'
1043
return "BYTEA"
1044
else
1045
throw Error("pg_type: unknown type '#{type}'")
1046
1047
# Certain field names we used with RethinkDB
1048
# aren't allowed without quoting in Postgres.
1049
NEEDS_QUOTING =
1050
user : true
1051
exports.quote_field = quote_field = (field) ->
1052
if field[0] == '"' # already quoted
1053
return field
1054
return "\"#{field}\""
1055
#if NEEDS_QUOTING[field]
1056
# return "\"#{field}\""
1057
#return field
1058
1059
# Timestamp the given number of seconds **in the future**.
1060
exports.expire_time = expire_time = (ttl) ->
1061
if ttl then new Date((new Date() - 0) + ttl*1000)
1062
1063
# Returns a function that takes as input the output of doing a SQL query.
1064
# If there are no results, returns undefined.
1065
# If there is exactly one result, what is returned depends on pattern:
1066
# 'a_field' --> returns the value of this field in the result
1067
# If more than one result, an error
1068
exports.one_result = one_result = (pattern, cb) ->
1069
if not cb? and typeof(pattern) == 'function'
1070
cb = pattern
1071
pattern = undefined
1072
if not cb?
1073
return -> # do nothing -- return function that ignores result
1074
return (err, result) ->
1075
if err
1076
cb(err)
1077
return
1078
if not result?.rows?
1079
cb()
1080
return
1081
switch result.rows.length
1082
when 0
1083
cb()
1084
when 1
1085
obj = misc.map_without_undefined(result.rows[0])
1086
if not pattern?
1087
cb(undefined, obj)
1088
return
1089
switch typeof(pattern)
1090
when 'string'
1091
x = obj[pattern]
1092
if not x? # null or undefined -- SQL returns null, but we want undefined
1093
cb()
1094
else
1095
if obj.expire? and new Date() >= obj.expire
1096
cb()
1097
else
1098
cb(undefined, x)
1099
when 'object'
1100
x = {}
1101
for p in pattern
1102
if obj[p]?
1103
x[p] = obj[p]
1104
cb(undefined, x)
1105
else
1106
cb("BUG: unknown pattern -- #{pattern}")
1107
else
1108
cb("more than one result")
1109
1110
exports.all_results = all_results = (pattern, cb) ->
1111
if not cb? and typeof(pattern) == 'function'
1112
cb = pattern
1113
pattern = undefined
1114
if not cb?
1115
return -> # do nothing -- return function that ignores result
1116
return (err, result) ->
1117
if err
1118
cb(err)
1119
else
1120
rows = result.rows
1121
if not pattern?
1122
# TODO: we use stupid (?) misc.copy to unwrap from pg driver type -- investigate better!
1123
# Maybe this is fine. I don't know.
1124
cb(undefined, (misc.copy(x) for x in rows))
1125
else if typeof(pattern) == 'string'
1126
cb(undefined, ((x[pattern] ? undefined) for x in rows))
1127
else
1128
cb("unsupported pattern type '#{typeof(pattern)}'")
1129
1130
1131
exports.count_result = count_result = (cb) ->
1132
if not cb?
1133
return -> # do nothing -- return function that ignores result
1134
return (err, result) ->
1135
if err
1136
cb(err)
1137
else
1138
cb(undefined, parseInt(result?.rows?[0]?.count))
1139
1140