Contact
CoCalc Logo Icon
StoreFeaturesDocsShareSupport News AboutSign UpSign In
| Download
Views: 39534
1
###
2
client.coffee -- A project viewed as a client for a hub.
3
4
For security reasons, a project does initiate a TCP connection to a hub,
5
but rather hubs initiate TCP connections to projects:
6
7
* MINUS: This makes various things more complicated, e.g., a project
8
might not have any open connection to a hub, but still "want" to write
9
something to the database; in such a case it is simply out of luck
10
and must wait.
11
12
* PLUS: Security is simpler since a hub initiates the connection to
13
a project. A hub doesn't have to receive TCP connections and decide
14
whether or not to trust what is on the other end of those connections.
15
16
That said, this architecture could change, and very little code would change
17
as a result.
18
###
19
20
# close our copy of syncstring (so stop watching it for changes, etc) if
21
# not active for this long (should be at least 5 minutes). Longer is better since
22
# it reduces how long a user might have to wait for save, etc.,
23
# but it slightly increases database work (managing a changefeed).
24
SYNCSTRING_MAX_AGE_M = 20
25
#SYNCSTRING_MAX_AGE_M = 1 # TESTING
26
27
# CRITICAL: The above SYNCSTRING_MAX_AGE_M idle timeout does *NOT* apply to Sage worksheet
28
# syncstrings, since they also maintain the sage session, put output into the
29
# syncstring, etc. It's critical that those only close when the user explicitly
30
# kills them, or the project is closed.
31
NEVER_CLOSE_SYNCSTRING_EXTENSIONS =
32
sagews : true
33
'sage-jupyter2' : true
34
35
fs = require('fs')
36
{join} = require('path')
37
38
{EventEmitter} = require('events')
39
40
async = require('async')
41
winston = require('winston')
42
winston.remove(winston.transports.Console)
43
winston.add(winston.transports.Console, {level: 'debug', timestamp:true, colorize:true})
44
45
require('coffee-script/register')
46
47
message = require('smc-util/message')
48
misc = require('smc-util/misc')
49
misc_node = require('smc-util-node/misc_node')
50
synctable = require('smc-util/synctable')
51
syncstring = require('smc-util/syncstring')
52
db_doc = require('smc-util/db-doc')
53
54
sage_session = require('./sage_session')
55
56
jupyter = require('./jupyter/jupyter')
57
58
{json} = require('./common')
59
60
kucalc = require('./kucalc')
61
62
{Watcher} = require('./watcher')
63
64
{defaults, required} = misc
65
66
DEBUG = false
67
#DEBUG = true
68
69
# Easy way to enable debugging in any project anywhere.
70
DEBUG_FILE = process.env.HOME + '/.smc-DEBUG'
71
if fs.existsSync(DEBUG_FILE)
72
winston.debug("'#{DEBUG_FILE}' exists, so enabling very verbose logging")
73
DEBUG = true
74
else
75
winston.debug("'#{DEBUG_FILE}' does not exist; minimal logging")
76
77
78
class exports.Client extends EventEmitter
79
constructor: (@project_id) ->
80
@dbg('constructor')()
81
@setMaxListeners(300) # every open file/table/sync db listens for connect event, which adds up.
82
# initialize two caches
83
@_hub_callbacks = {}
84
@_hub_client_sockets = {}
85
@_changefeed_sockets = {}
86
@_connected = false
87
88
# Start listening for syncstrings that have been recently modified, so that we
89
# can open them and porivde filesystem and computational support.
90
@_init_recent_syncstrings_table()
91
92
if kucalc.IN_KUCALC
93
kucalc.init(@)
94
# always make verbose in kucalc, since logs are taken care of by the k8s
95
# logging infrastructure...
96
DEBUG = true
97
98
###
99
_test_ping: () =>
100
dbg = @dbg("_test_ping")
101
test = () =>
102
dbg("ping")
103
t0 = new Date()
104
@call
105
message : message.ping()
106
timeout : 3
107
cb : (err, resp) =>
108
dbg("pong: #{new Date()-t0}ms; got err=#{err}, resp=#{json(resp)}")
109
setInterval(test, 7*1000)
110
111
_test_query_set: () =>
112
dbg = @dbg("_test_query_set")
113
test = () =>
114
dbg("query")
115
@query
116
query :
117
projects : {title:"the project takes over!", description:"description set too"}
118
cb : (err, resp) =>
119
dbg("got: err=#{err}, resp=#{json(resp)}")
120
setInterval(test, 6*1000)
121
122
_test_query_get: () =>
123
dbg = @dbg("_test_query_get")
124
test = () =>
125
dbg("query")
126
@query
127
query :
128
projects : [{project_id:null, title:null, description:null}]
129
timeout : 3
130
cb : (err, resp) =>
131
dbg("got: err=#{err}, resp=#{json(resp)}")
132
setInterval(test, 5*1000)
133
134
_test_sync_table: () =>
135
dbg = @dbg("_test_sync_table")
136
table = @sync_table(projects : [{project_id:null, title:null, description:null}])
137
table.on 'change', (x) =>
138
dbg("table=#{json(table.get().toJS())}")
139
#table.set({title:'foo'})
140
141
_test_sync_string: () =>
142
dbg = @dbg("_test_sync_string")
143
dbg()
144
s = @sync_string(id:'5039592f55e13b2d1b78c55ae4a4d3188f3e98a6')
145
s.on 'change', () =>
146
dbg("sync_string changed to='#{s.version()}'")
147
return s
148
###
149
150
_init_recent_syncstrings_table: () =>
151
dbg = @dbg("_init_recent_syncstrings_table")
152
dbg()
153
obj =
154
project_id : @project_id
155
max_age_m : SYNCSTRING_MAX_AGE_M
156
path : null
157
last_active : null
158
deleted : null
159
doctype : null
160
161
@_open_syncstrings = {}
162
@_recent_syncstrings = @sync_table(recent_syncstrings_in_project:[obj])
163
@_recent_syncstrings.on 'change', =>
164
@_update_recent_syncstrings()
165
166
@_recent_syncstrings.once 'change', =>
167
# We have to do this interval check since syncstrings no longer satisfying the max_age_m query
168
# do NOT automatically get removed from the table (that's just not implemented yet).
169
# This interval check is also important in order to detect files that were deleted then
170
# recreated.
171
@_recent_syncstrings_interval = setInterval(@_update_recent_syncstrings, 300)
172
173
_update_recent_syncstrings: () =>
174
dbg = @dbg("update_recent_syncstrings")
175
cutoff = misc.minutes_ago(SYNCSTRING_MAX_AGE_M)
176
@_wait_syncstrings ?= {}
177
keys = {}
178
x = @_recent_syncstrings.get()
179
if not x?
180
return
181
log_message = "open_syncstrings: #{misc.len(@_open_syncstrings)}; recent_syncstrings: #{x.size}"
182
if log_message != @_update_recent_syncstrings_last
183
winston.debug(log_message)
184
@_update_recent_syncstrings_last = log_message
185
x.map (val, key) =>
186
string_id = val.get('string_id')
187
path = val.get('path')
188
if path == '.smc/local_hub/local_hub.log'
189
# do NOT open this file, since opening it causes a feedback loop! The act of opening
190
# it is logged in it, which results in further logging ...!
191
return
192
#winston.debug("LAST_ACTIVE: #{val.get('last_active')}, typeof=#{typeof(val.get('last_active'))}")
193
if val.get("last_active") > cutoff
194
keys[string_id] = true # anything not set here gets closed below.
195
#dbg("considering '#{path}' with id '#{string_id}'")
196
if @_open_syncstrings[string_id]? or @_wait_syncstrings[string_id]
197
# either already open or waiting a bit before opening
198
return
199
if not @_open_syncstrings[string_id]?
200
deleted = val.get('deleted')
201
dbg("path='#{path}', deleted=#{deleted}, string_id='#{string_id}'")
202
async.series([
203
(cb) =>
204
if not deleted
205
# sync file (in database) is not deleted so we will open
206
cb()
207
return
208
dbg("check if '#{path}' exists") # if so, undelete, obviously.
209
@path_exists
210
path : path
211
cb : (err, exists) =>
212
if err
213
cb(err)
214
else
215
deleted = not exists
216
cb()
217
], (err) =>
218
if err
219
dbg("SERIOUS ERROR -- #{err}")
220
else if deleted
221
# do nothing -- don't open
222
dbg("ignoring deleted path '#{path}'")
223
else if not @_open_syncstrings[string_id]?
224
dbg("open syncstring '#{path}' with id '#{string_id}'")
225
226
ext = misc.separate_file_extension(path).ext
227
228
doctype = val.get('doctype')
229
if doctype?
230
dbg("using doctype='#{doctype}'")
231
doctype = misc.from_json(doctype)
232
opts = doctype.opts ? {}
233
opts.path = path
234
type = doctype.type
235
else
236
opts = {path:path}
237
type = 'string'
238
239
if ext == 'sage-ipython'
240
opts.change_throttle = opts.patch_interval = 5
241
opts.save_interval = 25
242
243
ss = @_open_syncstrings[string_id] = @["sync_#{type}"](opts)
244
245
ss.on 'error', (err) =>
246
dbg("ERROR creating syncstring '#{path}' -- #{err}; will try again later")
247
ss.close()
248
249
ss.on 'close', () =>
250
dbg("remove syncstring '#{path}' with id '#{string_id}' from cache due to close")
251
delete @_open_syncstrings[string_id]
252
# Wait at least 10s before re-opening this syncstring, in case deleted:true passed to db, etc.
253
@_wait_syncstrings[string_id] = true
254
setTimeout((()=>delete @_wait_syncstrings[string_id]), 10000)
255
256
switch ext
257
when 'sage-jupyter2'
258
jupyter.jupyter_backend(ss, @)
259
260
)
261
return # so map doesn't terminate due to funny return value
262
263
for string_id, val of @_open_syncstrings
264
path = val._path
265
if not keys[string_id] and not NEVER_CLOSE_SYNCSTRING_EXTENSIONS[misc.filename_extension(path)]
266
dbg("close syncstring '#{path}' with id '#{string_id}'")
267
val.close()
268
delete @_open_syncstrings[string_id]
269
270
# use to define a logging function that is cleanly used internally
271
dbg: (f) =>
272
if DEBUG
273
return (m...) ->
274
switch m.length
275
when 0
276
s = ''
277
when 1
278
s = m[0]
279
else
280
s = JSON.stringify(m)
281
winston.debug("Client.#{f}: #{misc.trunc_middle(s,1000)}")
282
else
283
return (m) ->
284
285
alert_message: (opts) =>
286
opts = defaults opts,
287
type : 'default'
288
title : undefined
289
message : required
290
block : undefined
291
timeout : undefined # time in seconds
292
@dbg('alert_message')(opts.title, opts.message)
293
294
# todo: more could be closed...
295
close: () =>
296
for _, s of misc.keys(@_open_syncstrings)
297
s.close()
298
delete @_open_syncstrings
299
clearInterval(@_recent_syncstrings_interval)
300
301
# account_id or project_id of this client
302
client_id: () =>
303
return @project_id
304
305
# true since this client is a project
306
is_project: () =>
307
return true
308
309
# false since this client is not a user
310
is_user: () =>
311
return false
312
313
is_signed_in: () =>
314
return true
315
316
is_connected: =>
317
return @_connected
318
319
# We trust the time on our own compute servers (unlike random user's browser).
320
server_time: () =>
321
return new Date()
322
323
# Declare that the given socket is active right now and can be used for
324
# communication with some hub (the one the socket is connected to).
325
active_socket: (socket) =>
326
dbg = @dbg("active_socket(id=#{socket.id})")
327
x = @_hub_client_sockets[socket.id]
328
if not x?
329
dbg()
330
x = @_hub_client_sockets[socket.id] = {socket:socket, callbacks:{}, activity:new Date()}
331
socket.on 'end', =>
332
dbg("end")
333
if x.callbacks?
334
for id, cb of x.callbacks
335
cb?('socket closed')
336
delete x.callbacks # so additional trigger of end doesn't do anything
337
delete @_hub_client_sockets[socket.id]
338
dbg("number of active sockets now equals #{misc.len(@_hub_client_sockets)}")
339
if misc.len(@_hub_client_sockets) == 0
340
@_connected = false
341
dbg("lost all active sockets")
342
@emit('disconnected')
343
if misc.len(@_hub_client_sockets) >= 1
344
dbg("CONNECTED!")
345
@_connected = true
346
@emit('connected')
347
else
348
x.activity = new Date()
349
350
# Handle a mesg coming back from some hub. If we have a callback we call it
351
# for the given message, then return true. Otherwise, return
352
# false, meaning something else should try to handle this message.
353
handle_mesg: (mesg, socket) =>
354
dbg = @dbg("handle_mesg(#{json(mesg)})")
355
f = @_hub_callbacks[mesg.id]
356
if f?
357
dbg("calling callback")
358
if not mesg.multi_response
359
delete @_hub_callbacks[mesg.id]
360
delete @_hub_client_sockets[socket.id].callbacks[mesg.id]
361
f(mesg)
362
return true
363
else
364
dbg("no callback")
365
return false
366
367
# Get a socket connection to the hub from one in our cache; choose the
368
# connection that most recently sent us a message. There is no guarantee
369
# to get the same hub if you call this twice! Returns undefined if there
370
# are currently no connections from any hub to us (in which case, the project
371
# must wait).
372
get_hub_socket: () =>
373
v = misc.values(@_hub_client_sockets)
374
if v.length == 0
375
return
376
v.sort (a,b) -> misc.cmp(a.activity ? 0, b.activity ? 0)
377
return v[v.length-1].socket
378
379
# Return a list of *all* the socket connections from hubs to this local_hub
380
get_all_hub_sockets = () =>
381
return (x.socket for x in misc.values(@_hub_client_sockets))
382
383
# Send a message to some hub server and await a response (if cb defined).
384
call: (opts) =>
385
opts = defaults opts,
386
message : required
387
timeout : undefined # timeout in seconds; if specified call will error out after this much time
388
socket : undefined # if specified, use this socket
389
cb : undefined # awaits response if given
390
dbg = @dbg("call(message=#{json(opts.message)})")
391
dbg()
392
socket = opts.socket ?= @get_hub_socket() # set socket to best one if no socket specified
393
if not socket?
394
dbg("no sockets")
395
# currently, due to the security model, there's no way out of this; that will change...
396
opts.cb?("no hubs currently connected to this project")
397
return
398
if opts.cb?
399
if opts.timeout
400
dbg("configure timeout")
401
fail = () =>
402
dbg("failed")
403
delete @_hub_callbacks[opts.message.id]
404
opts.cb?("timeout after #{opts.timeout}s")
405
delete opts.cb
406
timer = setTimeout(fail, opts.timeout*1000)
407
opts.message.id ?= misc.uuid()
408
cb = @_hub_callbacks[opts.message.id] = (resp) =>
409
#dbg("got response: #{misc.trunc(json(resp),400)}")
410
if timer?
411
clearTimeout(timer)
412
timer = undefined
413
if resp.event == 'error'
414
opts.cb?(if resp.error then resp.error else 'error')
415
else
416
opts.cb?(undefined, resp)
417
@_hub_client_sockets[socket.id].callbacks[opts.message.id] = cb
418
# Finally, send the message
419
socket.write_mesg('json', opts.message)
420
421
# Do a project_query
422
query: (opts) =>
423
opts = defaults opts,
424
query : required # a query (see schema.coffee)
425
changes : undefined # whether or not to create a changefeed
426
options : undefined # options to the query, e.g., [{limit:5}] )
427
timeout : 30 # how long to wait for initial result
428
cb : required
429
if opts.options? and not misc.is_array(opts.options)
430
throw Error("options must be an array")
431
return
432
mesg = message.query
433
id : misc.uuid()
434
query : opts.query
435
options : opts.options
436
changes : opts.changes
437
multi_response : opts.changes
438
socket = @get_hub_socket()
439
if not socket?
440
# It will try later when one is available...
441
opts.cb("no hub socket available")
442
return
443
if opts.changes
444
# Record socket for this changefeed in @_changefeed_sockets
445
@_changefeed_sockets[mesg.id] = socket
446
# CRITICAL: On error or end, send an end error to the synctable, so that it will
447
# attempt to reconnect (and also stop writing to the socket).
448
# This is important, since for project clients
449
# the disconnected event is only emitted when *all* connections from
450
# hubs to the local_hub end. If two connections s1 and s2 are open,
451
# and s1 is used for a sync table, and s1 closes (e.g., hub1 is restarted),
452
# then s2 is still open and no 'disconnected' event is emitted. Nonetheless,
453
# it's important for the project to consider the synctable broken and
454
# try to reconnect it, which in this case it would do using s2.
455
socket.on 'error', =>
456
opts.cb('socket-end')
457
socket.on 'end', =>
458
opts.cb('socket-end')
459
@call
460
message : mesg
461
timeout : opts.timeout
462
socket : socket
463
cb : opts.cb
464
465
# Cancel an outstanding changefeed query.
466
query_cancel: (opts) =>
467
opts = defaults opts,
468
id : required # changefeed id
469
cb : undefined
470
socket = @_changefeed_sockets[opts.id]
471
if not socket?
472
# nothing to do
473
opts.cb?()
474
else
475
@call
476
message : message.query_cancel(id:opts.id)
477
timeout : 30
478
socket : socket
479
cb : opts.cb
480
481
# Get a list of the ids of changefeeds that remote hubs are pushing to this project.
482
# This just does its best and if there is an error/timeout trying to get ids from a hub,
483
# assumes that hub isn't working anymore.
484
query_get_changefeed_ids: (opts) =>
485
opts = defaults opts,
486
timeout : 30
487
cb : required # opts.cb(undefined, [ids...])
488
ids = []
489
f = (socket, cb) =>
490
@call # getting a message back with this id cancels listening
491
message : message.query_get_changefeed_ids()
492
timeout : opts.timeout
493
socket : socket
494
cb : (err, resp) =>
495
if not err
496
ids = ids.concat(resp.changefeed_ids)
497
cb()
498
async.map @get_all_hub_sockets(), f, () =>
499
opts.cb(undefined, ids)
500
501
# Get the synchronized table defined by the given query.
502
sync_table: (query, options, debounce_interval=2000, throttle_changes=undefined) =>
503
return synctable.sync_table(query, options, @, debounce_interval, throttle_changes)
504
# TODO maybe change here and in misc-util and everything that calls this stuff...; or change sync_string.
505
#opts = defaults opts,
506
# query : required
507
# options : undefined
508
# debounce_interval : 2000
509
#return synctable.sync_table(opts.query, opts.options, @, opts.debounce_interval)
510
511
# Get the synchronized string with the given path.
512
sync_string: (opts) =>
513
opts = defaults opts,
514
path : required
515
save_interval : 500 # amount to debounce saves (in ms)
516
patch_interval : 500 # debouncing of incoming patches
517
opts.client = @
518
opts.project_id = @project_id
519
@dbg("sync_string(path='#{opts.path}')")()
520
return new syncstring.SyncString(opts)
521
522
sync_db: (opts) =>
523
opts = defaults opts,
524
path : required
525
primary_keys : required
526
string_cols : []
527
change_throttle : 0 # amount to throttle change events (in ms)
528
save_interval : 500 # amount to debounce saves (in ms)
529
patch_interval : 500 # debouncing of incoming patches
530
opts.client = @
531
opts.project_id = @project_id
532
@dbg("sync_db(path='#{opts.path}')")()
533
return new db_doc.SyncDB(opts)
534
535
# Write a file to a given path (relative to env.HOME) on disk; will create containing directory.
536
# If file is currently being written or read in this process, will result in error (instead of silently corrupt data).
537
write_file: (opts) =>
538
opts = defaults opts,
539
path : required
540
data : required
541
cb : required
542
path = join(process.env.HOME, opts.path)
543
@_file_io_lock ?= {}
544
dbg = @dbg("write_file(path='#{opts.path}')")
545
dbg()
546
now = new Date()
547
if now - (@_file_io_lock[path] ? 0) < 15000 # lock automatically expires after 15 seconds (see https://github.com/sagemathinc/cocalc/issues/1147)
548
dbg("LOCK")
549
# Try again in about 1s.
550
setTimeout((() => @write_file(opts)), 500 + 500*Math.random())
551
return
552
@_file_io_lock[path] = now
553
dbg("@_file_io_lock = #{misc.to_json(@_file_io_lock)}")
554
async.series([
555
(cb) =>
556
misc_node.ensure_containing_directory_exists(path, cb)
557
(cb) =>
558
fs.writeFile(path, opts.data, cb)
559
], (err) =>
560
delete @_file_io_lock[path]
561
if err
562
dbg("error -- #{err}")
563
else
564
dbg("success")
565
opts.cb(err)
566
)
567
568
# Read file as a string from disk.
569
# If file is currently being written or read in this process, will result in error (instead of silently corrupt data).
570
path_read: (opts) =>
571
opts = defaults opts,
572
path : required
573
maxsize_MB : undefined # in megabytes; if given and file would be larger than this, then cb(err)
574
cb : required # cb(err, file content as string (not Buffer!))
575
content = undefined
576
path = join(process.env.HOME, opts.path)
577
dbg = @dbg("path_read(path='#{opts.path}', maxsize_MB=#{opts.maxsize_MB})")
578
dbg()
579
@_file_io_lock ?= {}
580
581
now = new Date()
582
if now - (@_file_io_lock[path] ? 0) < 15000 # lock expires after 15 seconds (see https://github.com/sagemathinc/cocalc/issues/1147)
583
dbg("LOCK")
584
# Try again in 1s.
585
setTimeout((() => @path_read(opts)), 500 + 500*Math.random())
586
return
587
@_file_io_lock[path] = now
588
589
dbg("@_file_io_lock = #{misc.to_json(@_file_io_lock)}")
590
async.series([
591
(cb) =>
592
if opts.maxsize_MB?
593
dbg("check if file too big")
594
@file_size
595
filename : opts.path
596
cb : (err, size) =>
597
if err
598
dbg("error checking -- #{err}")
599
cb(err)
600
else if size > opts.maxsize_MB * 1000000
601
dbg("file is too big!")
602
cb("file '#{opts.path}' size (=#{size/1000000}MB) too large (must be at most #{opts.maxsize_MB}MB); try opening it in a Terminal with vim instead or write to [email protected]")
603
else
604
dbg("file is fine")
605
cb()
606
else
607
cb()
608
(cb) =>
609
fs.readFile path, (err, data) =>
610
if err
611
dbg("error reading file -- #{err}")
612
cb(err)
613
else
614
dbg('read file')
615
content = data.toString()
616
cb()
617
], (err) =>
618
delete @_file_io_lock[path]
619
opts.cb(err, content)
620
)
621
622
path_access: (opts) =>
623
opts = defaults opts,
624
path : required # string
625
mode : required # string -- sub-sequence of 'rwxf' -- see https://nodejs.org/api/fs.html#fs_class_fs_stats
626
cb : required # cb(err); err = if any access fails; err=undefined if all access is OK
627
access = 0
628
for s in opts.mode
629
access |= fs[s.toUpperCase() + '_OK']
630
fs.access(opts.path, access, opts.cb)
631
632
path_exists: (opts) =>
633
opts = defaults opts,
634
path : required
635
cb : required
636
dbg = @dbg("checking if path (='#{opts.path}') exists")
637
dbg()
638
fs.exists opts.path, (exists) =>
639
dbg("returned #{exists}")
640
opts.cb(undefined, exists) # err actually never happens with node.js, so we change api to be more consistent
641
642
path_stat: (opts) => # see https://nodejs.org/api/fs.html#fs_class_fs_stats
643
opts = defaults opts,
644
path : required
645
cb : required
646
fs.stat(opts.path, opts.cb)
647
648
# Size of file in bytes (divide by 1000 for K, by 10^6 for MB.)
649
file_size: (opts) =>
650
opts = defaults opts,
651
filename : required
652
cb : required
653
@path_stat
654
path : opts.filename
655
cb : (err, stat) =>
656
opts.cb(err, stat?.size)
657
658
# execute a command using the shell or a subprocess -- see docs for execute_code in misc_node.
659
shell: (opts) =>
660
misc_node.execute_code(opts)
661
662
# return new sage session
663
sage_session: (opts) =>
664
opts = defaults opts,
665
path : required
666
return sage_session.sage_session(path:opts.path, client:@)
667
668
# returns a Jupyter kernel session
669
jupyter_kernel: (opts) =>
670
opts.client = @
671
return jupyter.kernel(opts)
672
673
jupyter_kernel_info: (opts) =>
674
opts = defaults opts,
675
cb : required
676
jupyter.get_kernel_data(opts.cb)
677
678
# See the file watcher.coffee for docs
679
watch_file: (opts) =>
680
opts = defaults opts,
681
path : required
682
interval : 3000 # polling interval in ms
683
debounce : 1000 # don't fire until at least this many ms after the file has REMAINED UNCHANGED
684
path = require('path').join(process.env.HOME, opts.path)
685
dbg = @dbg("watch_file(path='#{path}')")
686
dbg("watching file '#{path}'")
687
return new Watcher(path, opts.interval, opts.debounce)
688
689
690
691
692