Skip to content

Commit

Permalink
fix(ipc/subprocess) (#13414)
Browse files Browse the repository at this point in the history
Co-authored-by: Jarred Sumner <[email protected]>
  • Loading branch information
cirospaciari and Jarred-Sumner authored Aug 20, 2024
1 parent 5eb053f commit eb8ed27
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 130 deletions.
74 changes: 44 additions & 30 deletions src/bun.js/api/bun/subprocess.zig
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ pub const Subprocess = struct {
flags: Flags = .{},

weak_file_sink_stdin_ptr: ?*JSC.WebCore.FileSink = null,
ref_count: u32 = 1,

usingnamespace bun.NewRefCounted(@This(), Subprocess.deinit);

pub const Flags = packed struct {
is_sync: bool = false,
Expand Down Expand Up @@ -345,7 +348,7 @@ pub const Subprocess = struct {
return this.has_pending_activity.load(.acquire);
}

pub fn ref(this: *Subprocess) void {
pub fn jsRef(this: *Subprocess) void {
this.process.enableKeepingEventLoopAlive();

if (!this.hasCalledGetter(.stdin)) {
Expand All @@ -364,7 +367,7 @@ pub const Subprocess = struct {
}

/// This disables the keeping process alive flag on the poll and also in the stdin, stdout, and stderr
pub fn unref(this: *Subprocess) void {
pub fn jsUnref(this: *Subprocess) void {
this.process.disableKeepingEventLoopAlive();

if (!this.hasCalledGetter(.stdin)) {
Expand Down Expand Up @@ -675,7 +678,6 @@ pub const Subprocess = struct {
if (this.hasExited()) {
return .{ .result = {} };
}

return this.process.kill(@intCast(sig));
}

Expand All @@ -691,23 +693,20 @@ pub const Subprocess = struct {
}

pub fn doRef(this: *Subprocess, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) JSValue {
this.ref();
this.jsRef();
return .undefined;
}

pub fn doUnref(this: *Subprocess, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) JSValue {
this.unref();
this.jsUnref();
return .undefined;
}

pub fn onStdinDestroyed(this: *Subprocess) void {
this.flags.has_stdin_destructor_called = true;
this.weak_file_sink_stdin_ptr = null;

if (this.flags.finalized) {
// if the process has already been garbage collected, we can free the memory now
bun.default_allocator.destroy(this);
} else {
defer this.deref();
if (!this.flags.finalized) {
// otherwise update the pending activity flag
this.updateHasPendingActivity();
}
Expand Down Expand Up @@ -736,19 +735,21 @@ pub const Subprocess = struct {

return .undefined;
}

pub fn disconnectIPC(this: *Subprocess, nextTick: bool) void {
const ipc_data = this.ipc() orelse return;
ipc_data.close(nextTick);
}
pub fn disconnect(this: *Subprocess, globalThis: *JSGlobalObject, callframe: *JSC.CallFrame) JSValue {
_ = globalThis;
_ = callframe;
const ipc_data = this.ipc_maybe() orelse return .undefined;
ipc_data.close();
this.ipc_data = null;
this.disconnectIPC(true);
return .undefined;
}

pub fn getConnected(this: *Subprocess, globalThis: *JSGlobalObject) JSValue {
_ = globalThis;
return JSValue.jsBoolean(this.ipc_maybe() != null);
const ipc_data = this.ipc();
return JSValue.jsBoolean(ipc_data != null and ipc_data.?.disconnected == false);
}

pub fn pid(this: *const Subprocess) i32 {
Expand Down Expand Up @@ -837,7 +838,7 @@ pub const Subprocess = struct {
ref_count: u32 = 1,
buffer: []const u8 = "",

pub usingnamespace bun.NewRefCounted(@This(), deinit);
pub usingnamespace bun.NewRefCounted(@This(), @This().deinit);
const This = @This();
const print = bun.Output.scoped(.StaticPipeWriter, false);

Expand Down Expand Up @@ -961,7 +962,7 @@ pub const Subprocess = struct {
pub const IOReader = bun.io.BufferedReader;
pub const Poll = IOReader;

pub usingnamespace bun.NewRefCounted(PipeReader, deinit);
pub usingnamespace bun.NewRefCounted(PipeReader, PipeReader.deinit);

pub fn hasPendingActivity(this: *const PipeReader) bool {
if (this.state == .pending)
Expand Down Expand Up @@ -1238,6 +1239,7 @@ pub const Subprocess = struct {
}
pipe.writer.setParent(pipe);
subprocess.weak_file_sink_stdin_ptr = pipe;
subprocess.ref();
subprocess.flags.has_stdin_destructor_called = false;

return Writable{
Expand Down Expand Up @@ -1296,6 +1298,7 @@ pub const Subprocess = struct {
}

subprocess.weak_file_sink_stdin_ptr = pipe;
subprocess.ref();
subprocess.flags.has_stdin_destructor_called = false;

pipe.writer.handle.poll.flags.insert(.socket);
Expand Down Expand Up @@ -1347,6 +1350,7 @@ pub const Subprocess = struct {
} else {
subprocess.flags.has_stdin_destructor_called = false;
subprocess.weak_file_sink_stdin_ptr = pipe;
subprocess.ref();
if (@intFromPtr(pipe.signal.ptr) == @intFromPtr(subprocess)) {
pipe.signal.clear();
}
Expand Down Expand Up @@ -1415,6 +1419,9 @@ pub const Subprocess = struct {
this_jsvalue.ensureStillAlive();
this.pid_rusage = rusage.*;
const is_sync = this.flags.is_sync;
defer this.deref();
defer this.disconnectIPC(true);


var stdin: ?*JSC.WebCore.FileSink = this.weak_file_sink_stdin_ptr;
var existing_stdin_value = JSC.JSValue.zero;
Expand Down Expand Up @@ -1557,6 +1564,11 @@ pub const Subprocess = struct {
this.on_disconnect_callback.deinit();
}

pub fn deinit(this: *Subprocess) void {
log("deinit", .{});
this.destroy();
}

pub fn finalize(this: *Subprocess) callconv(.C) void {
log("finalize", .{});
// Ensure any code which references the "this" value doesn't attempt to
Expand All @@ -1571,10 +1583,7 @@ pub const Subprocess = struct {
this.process.deref();

this.flags.finalized = true;
if (this.weak_file_sink_stdin_ptr == null) {
// if no file sink exists we can free immediately
bun.default_allocator.destroy(this);
}
this.deref();
}

pub fn getExited(
Expand Down Expand Up @@ -2057,7 +2066,7 @@ pub const Subprocess = struct {
} else {},
};

const process_allocator = globalThis.allocator();
const process_allocator = bun.default_allocator;
var subprocess = process_allocator.create(Subprocess) catch {
globalThis.throwOutOfMemory();
return .zero;
Expand Down Expand Up @@ -2156,23 +2165,28 @@ pub const Subprocess = struct {
.is_sync = is_sync,
},
};
subprocess.ref(); // + one ref for the process
subprocess.process.setExitHandler(subprocess);

if (subprocess.ipc_data) |*ipc_data| {
if (Environment.isPosix) {
if (posix_ipc_info.ext(*Subprocess)) |ctx| {
ctx.* = subprocess;
subprocess.ref(); // + one ref for the IPC
}
} else {
subprocess.ref(); // + one ref for the IPC

if (ipc_data.configureServer(
Subprocess,
subprocess,
subprocess.stdio_pipes.items[@intCast(ipc_channel)].buffer,
).asErr()) |err| {
process_allocator.destroy(subprocess);
subprocess.deref();
globalThis.throwValue(err.toJSC(globalThis));
return .zero;
}
subprocess.stdio_pipes.items[@intCast(ipc_channel)] = .unavailable;
}
ipc_data.writeVersionPacket();
}
Expand Down Expand Up @@ -2319,8 +2333,12 @@ pub const Subprocess = struct {
pub fn handleIPCClose(this: *Subprocess) void {
IPClog("Subprocess#handleIPCClose", .{});
this.updateHasPendingActivity();
const ok = this.ipc_data != null;
if (ok) this.ipc().internal_msg_queue.deinit();
defer this.deref();
var ok = false;
if (this.ipc()) |ipc_data| {
ok = true;
ipc_data.internal_msg_queue.deinit();
}
this.ipc_data = null;

const this_jsvalue = this.this_jsvalue;
Expand All @@ -2330,11 +2348,7 @@ pub const Subprocess = struct {
}
}

pub fn ipc(this: *Subprocess) *IPC.IPCData {
return &this.ipc_data.?;
}

pub fn ipc_maybe(this: *Subprocess) ?*IPC.IPCData {
pub fn ipc(this: *Subprocess) ?*IPC.IPCData {
return &(this.ipc_data orelse return null);
}

Expand Down
4 changes: 2 additions & 2 deletions src/bun.js/api/server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -5776,7 +5776,7 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
break :brk false;
};

JSC.C.JSValueUnprotect(this.globalThis, this.thisObject.asObjectRef());
this.thisObject.unprotect();
this.thisObject = .undefined;
this.stop(abrupt);
}
Expand All @@ -5786,7 +5786,7 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp

pub fn disposeFromJS(this: *ThisServer) JSC.JSValue {
if (this.listener != null) {
JSC.C.JSValueUnprotect(this.globalThis, this.thisObject.asObjectRef());
this.thisObject.unprotect();
this.thisObject = .undefined;
this.stop(true);
}
Expand Down
Loading

0 comments on commit eb8ed27

Please sign in to comment.