Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[UPDATE] Add Socketio support #1292

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ const peer = new Peer("pick-an-id");
// You can pick your own id or omit the id if you want to get a random one from the server.
```

**Socketio?**
* not supported on the official peer server you need to launch your own using this [link](https://github.com/Judimax/peerjs-server/tree/PR-socketio-support)
```js
const peer = new Peer("pick-an-id",{clientType:"socketio"});
```

## Data connections

**Connect**
Expand Down
11 changes: 4 additions & 7 deletions lib/baseconnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,10 @@ export interface BaseConnectionEvents<
iceStateChanged: (state: RTCIceConnectionState) => void;
}

export abstract class BaseConnection<
SubClassEvents extends ValidEventTypes,
ErrorType extends string = never,
> extends EventEmitterWithError<
ErrorType | BaseConnectionErrorType,
SubClassEvents & BaseConnectionEvents<BaseConnectionErrorType | ErrorType>
> {
export abstract class BaseConnection<SubClassEvents extends ValidEventTypes,ErrorType extends string = never> extends
EventEmitterWithError<
ErrorType | BaseConnectionErrorType, SubClassEvents & BaseConnectionEvents<BaseConnectionErrorType | ErrorType>
> {
protected _open = false;

/**
Expand Down
28 changes: 24 additions & 4 deletions lib/dataconnection/BufferedConnection/BinaryPack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { Peer } from "../../peer";
import { BufferedConnection } from "./BufferedConnection";
import { SerializationType } from "../../enums";
import { pack, type Packable, unpack } from "peerjs-js-binarypack";
import { decode, encode } from "@msgpack/msgpack";

export class BinaryPack extends BufferedConnection {
private readonly chunker = new BinaryPackChunker();
Expand All @@ -28,11 +29,19 @@ export class BinaryPack extends BufferedConnection {

// Handles a DataChannel message.
protected override _handleDataMessage({ data }: { data: Uint8Array }): void {
const deserializedData = unpack(data);

let deserializedData
if(this.options.msgpackType ==="peerjs"){
deserializedData = unpack(data);
}else{
deserializedData = decode(data);
}
// PeerJS specific message
// console.log(data)
// console.log(deserializedData)
const peerData = deserializedData["__peerData"];
if (peerData) {


if (peerData.type === "close") {
this.close();
return;
Expand All @@ -53,6 +62,7 @@ export class BinaryPack extends BufferedConnection {
total: number;
data: ArrayBuffer;
}): void {
logger.chunk(data)
const id = data.__peerData;
const chunkInfo = this._chunkedData[id] || {
data: [],
Expand All @@ -76,7 +86,13 @@ export class BinaryPack extends BufferedConnection {
}

protected override _send(data: Packable, chunked: boolean) {
const blob = pack(data);

let blob
if(this.options.msgpackType ==="peerjs"){
blob = pack(data);
}else{
blob = encode(data);
}
if (blob instanceof Promise) {
return this._send_blob(blob);
}
Expand All @@ -88,6 +104,7 @@ export class BinaryPack extends BufferedConnection {

this._bufferedSend(blob);
}

private async _send_blob(blobPromise: Promise<ArrayBufferLike>) {
const blob = await blobPromise;
if (blob.byteLength > this.chunker.chunkedMTU) {
Expand All @@ -99,10 +116,13 @@ export class BinaryPack extends BufferedConnection {
}

private _sendChunks(blob: ArrayBuffer) {
this.chunker.chunkedMTU = this.messageSize;
const blobs = this.chunker.chunk(blob);
logger.log(`DC#${this.connectionId} Try to send ${blobs.length} chunks...`);
logger.chunk(`DC#${this.connectionId} Try to send ${blobs.length} chunks...`);


for (const blob of blobs) {
logger.chunk(`chunk data ${blob.toString()}`);
this.send(blob, true);
}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/dataconnection/BufferedConnection/binaryPackChunker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export class BinaryPackChunker {
readonly chunkedMTU = 16300; // The original 60000 bytes setting does not work when sending data from Firefox to Chrome, which is "cut off" after 16384 bytes and delivered individually.
chunkedMTU = 16300; // The original 60000 bytes setting does not work when sending data from Firefox to Chrome, which is "cut off" after 16384 bytes and delivered individually.

// Binary stuff

Expand Down
36 changes: 35 additions & 1 deletion lib/dataconnection/DataConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { BaseConnection, type BaseConnectionEvents } from "../baseconnection";
import type { ServerMessage } from "../servermessage";
import type { EventsWithError } from "../peerError";
import { randomToken } from "../utils/randomToken";
import { BinaryPackChunker } from "./BufferedConnection/binaryPackChunker";

export interface DataConnectionEvents
extends EventsWithError<DataConnectionErrorType | BaseConnectionErrorType>,
Expand Down Expand Up @@ -38,6 +39,7 @@ export abstract class DataConnection extends BaseConnection<
private _negotiator: Negotiator<DataConnectionEvents, this>;
abstract readonly serialization: string;
readonly reliable: boolean;
messageSize =new BinaryPackChunker().chunkedMTU;

public get type() {
return ConnectionType.Data;
Expand All @@ -62,13 +64,45 @@ export abstract class DataConnection extends BaseConnection<
);
}

protected parseMaximumSize(description?: RTCSessionDescription): number {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
protected parseMaximumSize(description?: RTCSessionDescription): number {
static parseMaximumSize(description?: RTCSessionDescription): number {

const remoteLines = description?.sdp?.split('\r\n') ?? [];
logger.log("peerDescription\n" +remoteLines)
let remoteMaximumSize = 0;
for (const line of remoteLines) {
if (line.startsWith('a=max-message-size:')) {
const string = line.substring('a=max-message-size:'.length);
remoteMaximumSize = parseInt(string, 10);
break;
}
}

if (remoteMaximumSize === 0) {
logger.log('SENDER: No max message size session description');
}

// 16 kb should be supported on all clients so we can use it
// even if no max message is set
return Math.max(remoteMaximumSize, (new BinaryPackChunker()).chunkedMTU);
}

protected async updateMaximumMessageSize(): Promise<void> {
const local = await this.peerConnection!.localDescription;
const remote = await this.peerConnection!.remoteDescription;
const localMaximumSize = this.parseMaximumSize(local);
const remoteMaximumSize = this.parseMaximumSize(remote);
this.messageSize = Math.min(localMaximumSize, remoteMaximumSize);

logger.log(`SENDER: Updated max message size: ${this.messageSize} Local: ${localMaximumSize} Remote: ${remoteMaximumSize}`);
}

/** Called by the Negotiator when the DataChannel is ready. */
override _initializeDataChannel(dc: RTCDataChannel): void {
this.dataChannel = dc;

this.dataChannel.onopen = () => {
this.dataChannel.onopen = async () => {
logger.log(`DC#${this.connectionId} dc connection success`);
this._open = true;
await this.updateMaximumMessageSize()
this.emit("open");
};

Expand Down
10 changes: 10 additions & 0 deletions lib/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ export enum LogLevel {
* Prints all logs.
*/
All,
/**
* prints data Chunks
*/
DataChunk
}

class Logger {
Expand All @@ -37,6 +41,12 @@ class Logger {
this._logLevel = logLevel;
}

chunk(...args: any[]) {
if (this._logLevel >= LogLevel.DataChunk) {
this._print(LogLevel.DataChunk, ...args);
}
}

log(...args: any[]) {
if (this._logLevel >= LogLevel.All) {
this._print(LogLevel.All, ...args);
Expand Down
41 changes: 33 additions & 8 deletions lib/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ class PeerOptions implements PeerJSOption {
referrerPolicy?: ReferrerPolicy;
logFunction?: (logLevel: LogLevel, ...rest: any[]) => void;
serializers?: SerializerMapping;
/**
* choose whether peerjs will be a websocket client or a socketio client
*/
clientType?:"websocket" | "socketio"
/**
* whether to use peerjs own implemation or the standard it recommended to use the standard for cross platform
*/
msgpackType?:"standard" | "peerjs"

}

export { type PeerOptions };
Expand Down Expand Up @@ -233,6 +242,8 @@ export class Peer extends EventEmitterWithError<PeerErrorType, PeerEvents> {
config: util.defaultConfig,
referrerPolicy: "strict-origin-when-cross-origin",
serializers: {},
clientType:"websocket",
msgpackType:"peerjs",
...options,
};
this._options = options;
Expand Down Expand Up @@ -288,13 +299,20 @@ export class Peer extends EventEmitterWithError<PeerErrorType, PeerEvents> {
return;
}


if (userId) {
this._initialize(userId);
this._initialize(userId).catch((error) => this._abort(PeerErrorType.ServerError, error));
} else {
this._api
if(this.options.clientType === "websocket"){
this._api
.retrieveId()
.then((id) => this._initialize(id))
.catch((error) => this._abort(PeerErrorType.ServerError, error));
}
else{
this._initialize().catch((error) => this._abort(PeerErrorType.ServerError, error));
}

}
}

Expand All @@ -305,6 +323,7 @@ export class Peer extends EventEmitterWithError<PeerErrorType, PeerEvents> {
this._options.port!,
this._options.path!,
this._options.key!,
this._options.clientType,
this._options.pingInterval,
);

Expand Down Expand Up @@ -340,11 +359,16 @@ export class Peer extends EventEmitterWithError<PeerErrorType, PeerEvents> {
}

/** Initialize a connection with the server. */
private _initialize(id: string): void {
this._id = id;
this.socket.start(id, this._options.token!);
private async _initialize(id?: string): Promise<void> {
if(this.options.clientType === "websocket"){

this._id = id;
await this.socket.start(id, this._options.token!);
} else{
await this.socket.start(id, this._options.token!);
this._id = this._socket._socketio.id
}
}

/** Handles messages from the server. */
private _handleMessage(message: ServerMessage): void {
const type = message.type;
Expand Down Expand Up @@ -705,14 +729,15 @@ export class Peer extends EventEmitterWithError<PeerErrorType, PeerEvents> {
* Destroyed peers cannot be reconnected.
* If the connection fails (as an example, if the peer's old ID is now taken),
* the peer's existing connections will not close, but any associated errors events will fire.
* should not be called often or at all with a socketio connection
*/
reconnect(): void {
async reconnect() {
if (this.disconnected && !this.destroyed) {
logger.log(
`Attempting reconnection to server with ID ${this._lastServerId}`,
);
this._disconnected = false;
this._initialize(this._lastServerId!);
await this._initialize(this._lastServerId!);
} else if (this.destroyed) {
throw new Error(
"This peer cannot reconnect to the server. It has already been destroyed.",
Expand Down
Loading