#StackBounty: #javascript #reactjs #mongodb #async-await #objectid Finding MongoDB documents by _id failing on the type of ObjectId

Bounty: 100

I’m struggling to find MongoDB documents by their _id field in my ReactJS project.

My collection has documents that looks like so (for example):

    _id: ObjectId("5f6651112efc19f33b34fc39")
    title: "This is a title"
    status: true

I’m using this (greatly simplified) code in a function to find the documents that match the id passed in:

const id = '5f6651112efc19f33b34fc39';
await mongoCollection.find({_id:ObjectId(id)});

ObjectId is defined like so:

const ObjectId = require('mongodb').ObjectID;

Yet even if I hard coded the id variable to be the ObjectId string from the document in my database, it fails with this error:

Uncaught (in promise) Error: invalid ObjectId, ObjectId.id must be either a string or a Buffer, but is [{"type":"Buffer","data":[]}]

Printing out id and ObjectId(id) before the await line results in the following:

Console.log screenshot

How should I be satisfying this warning?

Edit: I’m defining my app/collection like so:

const app = new Realm.App({ id: "<app-id>", timeout: 10000 });
const mongo = app.services.mongodb('mongodb-atlas');
const mongoCol = mongo.db('<databaseName>').collection('<collectionName>');


Get this bounty!!!

#StackBounty: #javascript #node.js #async-await #passport.js #openid-connect Node with Express session issue

Bounty: 100

I use the following code which works, However after a few success calls (5-10), we sometimes get an internal server error:

req.session["oidc:accounts.rvm.com"] is undefined

I’ve tried all the latest open source versions.

Error: did not find expected authorization request details in session, req.session["oidc:accounts.rvm.com"] is undefined
at /opt/node_app/app/node_modules/openid-client/lib/passport_strategy.js:125:13
at OpenIDConnectStrategy.authenticate (/opt/node_app/app/node_modules/openid-client/lib/passport_strategy.js:173:5)
at attempt (/opt/node_app/app/node_modules/passport/lib/middleware/authenticate.js:366:16)
at authenticate (/opt/node_app/app/node_modules/passport/lib/middleware/authenticate.js:367:7)
at /opt/node_app/app/src/logon.js:92:7 *******
at Layer.handle [as handle_request] (/opt/node_app/app/node_modules/express/lib/router/layer.js:95:5)
at next (/opt/node_app/app/node_modules/express/lib/router/route.js:137:13)
at Route.dispatch (/opt/node_app/app/node_modules/express/lib/router/route.js:112:3)
at Layer.handle [as handle_request] (/opt/node_app/app/node_modules/express/lib/router/layer.js:95:5)
at /opt/node_app/app/node_modules/express/lib/router/index.js:281:22

My code from the stack is:

at /opt/node_app/app/src/logon.js:92:7

Which is the end of the code here:

})(req, res, next);   // here is line 92 but not sure if it's related 

This is the full code (I pass the app which is simply an express server):

index.js

const express = require('express');
const logon = require('./logon');

const app = express();
const port = process.env.PORT || 4000;

logon(app)
  .then(() => {
    console.log('process started');
  });
app.use(express.json());

app.listen(port,
  () => console.log(`listening on port: ${port}`));

logon.js

const { Issuer, Strategy } = require('openid-client');
const cookieParser = require('cookie-parser');
const cookieSession = require('cookie-session');
const azpi = require('./azpi');
const bodyParser = require('body-parser');
const passport = require('passport');

module.exports = async (app) => {
  let oSrv;
  const durl = `${process.env.srvurl}/.well-known/openid-configuration`;
  try {
    oSrv = await Issuer.discover(durl);
  } catch (err) {
    console.log('error occured', err);
    return;
  }

  app.get('/', prs(), passport.authenticate('oidc'));

  const oSrvCli = new oSrv.Client({
    client_id: process.env.ci,
    client_secret: process.env.cs,
    token_endpoint_auth_method: 'client_secret_basic',
  });

  passport.serializeUser((user, done) => {
    done(null, user);
  });
  passport.deserializeUser((obj, done) => {
    done(null, obj);
  });

  const cfg = {
    scope: 'openid',
    redirect_uri: process.env.ruri,
    response_type: 'code',
    response_mode: 'form_post',
  };

  const prs = () => (req, res, next) => {
    passport.use(
      'oidc',
      new Strategy({ oSrvCli , cfg }, (tokenset, done) => {
        const claims = tokenset.claims();
        // first log
        console.log(`1. ------------User claims received------------);
        const user = {
          name: claims.name,
          id: claims.sub,
          id_token: tokenset.id_token,
        };
        return done(null, user);
      }),
    );
    next();
  };
  app.use(
    bodyParser.urlencoded({
      extended: false,
    }),
  );
  app.use(cookieParser('csec'));
  app.use(
    cookieSession({
      name: 'zta-auth',
      secret: 'csect',
    }),
  );

  app.use(passport.initialize());
  app.use(passport.session());

  app.get('/redirect', async (req, res, next) => {
    await passport.authenticate('oidc', async (err, user) => {
    // print second log
    console.log('2. ------------redirect Called!------------');
      if (err) {
        console.log(`Authentication failed: ${err}`);
        return next(err);
      }
      if (!user) {
        return res.send('no identity');
      }

      req.login(user, async (e) => {
        if (e) {
          console.log('not able to login', e);
          return next(e);
        }
        try {
          const url = await azpi.GetUsers(user.id_token);
          // print last log
          console.log('3. ------------user process finished successfully----');
          return res.redirect(url);
          
        } catch (er) {
          res.send(er.message);
        }
      });
    })(req, res, next);   //here is the error
  });
};

Sometimes when I debug, I see that the function is running out from GetUsers which is an async function and stops in })(req, res, next);, maybe it’s an async issue.

We want to use this code in prod instead of the previous Java implementation.

If I can use another technique for oidc, please let me know.


UPDATE

Each should be a single call and log in this order:

1. ------------User claims received------------
2. ------------redirect Called!------------
3. ------------user process finished successfully----

But, when I get the error:

1. ------------User claims received------------
2. ------------redirect Called!------------
3. ------------user process finished successfully----

2. ------------redirect Called!------------
Authentication failed: Error: did not find expected authorization request details in session, req.session

All the successful calls have the right log order (1-3).

When it fails, the first call User claims received doesn’t happen, just the second and the error.

If there is another way to achieve this (other lib’s etc), please let me know.


I’ve found this library which may help as it doesn’t use passport (I want to reduce deps to see where the problem is coming from).

When I try something like this:

app.use(
    auth({
     issuerBaseURL: `${URL}/.well-known/openid-configuration`,
     authorizationParams: {
    ...
     response_mode: 'form_post',
    }

I get this error: issuer response_mode supporting only "query" or "fragment", but when I run the code above (in the beginning of the post) with the same issuer and response_mode, everything is working, any ideas?


Get this bounty!!!

#StackBounty: #javascript #concurrency #async-await #try-catch Awaiting multiple promises inside an async function with try catch throw…

Bounty: 50

I can’t understand why the following code doesn’t throw:

const main = async () => {

    const Stop = (time) => new Promise((resolve) => setTimeout(resolve, time))

    try {
        const p1 = Stop(500).then(() => { throw new Error('Error ocurred') })
        const p2 = Stop(1000)

        await p1
        await p2
    } catch (err) {
        console.log('error catched')
    }
}

main()

But whenever I invert the order of p1 and p2 promises, like this:

const main = async () => {

    const Stop = (time) => new Promise((resolve) => setTimeout(resolve, time))

    try {
        const p1 = Stop(500).then(() => { throw new Error('Error ocurred') })
        const p2 = Stop(1000)

        await p2
        await p1
    } catch (err) {
        console.log('error catched')
    }
}

main()

Then an uncaught exception is thrown. I suppose that doing concurrent tasks like that without .catch functions is dangerous, but I thought the async code within a try catch would never throw.

Why isn’t this the case, exactly?


Get this bounty!!!

#StackBounty: #javascript #node.js #async-await #binance console.log not logging with await variable

Bounty: 100

I am trying to log the data of a promise to my console but it’s not showing. i have tried defining then in then and on top of functions and tried with let and redefining the before executing the algorithm but. no response

//////////////////////////////////

var trade;
const getTrades = async () => {
    return await axios({
        method: 'get',
        url: bUrl + tradeQuery
    })

}


const getSOrders = async () => {
    return await axios({
        method: 'get',
        url: bUrl + mOrderQuery
    })

}
const postOrder = async() => {
  const binanceRest = new api.BinanceRest({
      ...
    }
  )

  binanceRest.newOrder({
      ...
    })
    .then(async(data) => {
      const trades = await getTrades()
      const mOrders = await getSOrders()
      console.log(data)
      console.log(trades)
    })
    .catch((err) => {
      console.error(err)
    })
}

(
    postOrder(),
    async () => {
        const trades = await getTrades()
        const mOrders = await getSOrders()
        const sells = mOrders.data.asks
        const buys = mOrders.data.bids

        while (true/*while order is in */) {



            trade = trades.data[trades.data.length - 1]
             console.log(sells)
           
            
            
        }
    }


)()


Get this bounty!!!

#StackBounty: #javascript #node.js #mongodb #mongoose #async-await Mongoose pass data out of withTransaction helper

Bounty: 50

Introduction

Hey there,

I am trying to pass out data from the mongoose withTransaction callback. Right now, I am using the following code which implements callbacks:

const transactionSession = await mongoose.startSession()
        await transactionSession.withTransaction(async (tSession) => {
            try {
                // MARK Transaction writes & reads removed for brevity

                console.log("Successfully performed transaction!")
                cb(null, "Any test data")

                return Promise.resolve()
            } catch (error) {
                console.log("Transaction aborted due to error:", error)
                cb(error)

                return Promise.reject()
            }
        })

    } catch (error) {
        console.log(error)
        return cb(error)
    }

At the moment, I am using a callback to pass out data from the withTransactioncallback:

cb(null, "Any test data")

However, the problem is that naturally the callback is executed first, before the Promise.resolve() is returned. This means, that (in my case) a success response is sent back to the client before any necessary database writes are committed:

// this is executed first - the callback will send back a response to the client
cb(null, "Any test data")

// only now, after the response already got sent to the client, the transaction is committed.
return Promise.resolve()

Why I think this is a problem:

Honestly, I am not sure. It just doesn’t feel right to send back a success-response to the client, if there hasn’t been any database write at that time. Does anybody know the appropriate way to deal with this specific use-case?

I thought about passing data out of the withTransaction helper using something like this:

const transactionResult = await transactionSession.withTransaction({...})

I’ve tried it, and the response is a CommandResult of MongoDB, which does not include any of the data I included in the resolved promise.

Summary

Is it a problem, if a success response is sent back to the client before the transaction is committed? If so, what is the appropriate way to pass out data from the withTransaction helper and thereby committing the transaction before sending back a response?

I would be thankful for any advice I get.


Get this bounty!!!

#StackBounty: #python #django #async-await #websocket Sending notifications with Django channels

Bounty: 50

I have project on Django wich use Django Channels. I use Django Channel for sending notifications to users who are subscribed to articles changes (adding/editing/deleting comments on article).

So I’ve chosen this way of realization: every group of channels is an article and when changes happen, script sends notification to relevant groups. My code works correctly but I have some doubts if my choice of way of realization is most appropriate for this task. I need advice what is the best practice in my case?

Solution:

consumers.py

from channels.generic.websocket import AsyncJsonWebsocketConsumer
from channels.db import database_sync_to_async
from project.apps.account.models import UserStatus
from .models import CommentSubscribe


class CommentNotificationConsumer(AsyncJsonWebsocketConsumer):

    async def connect(self):
        await self.accept()
        if not self.scope['user'].is_anonymous:
            groups = await database_sync_to_async(self.get_users_subscription)()
            await database_sync_to_async(self.change_user_status)(True)
            await self.add_subscriptions(groups)

    async def add_subscriptions(self, groups):
        for group in groups:
            await self.channel_layer.group_add(
                'article_{0}'.format(group.article_id),
                self.channel_name
            )

    async def receive_json(self, content, **kwargs):
        command = content.get('command', None)
        article_id = content.get('article_id', None)
        if command == 'subscribe':
            await self.subscribe(article_id)
        elif command == 'unsubscribe':
            await self.unsubscribe(article_id)
        else:
            await self.send_json({
                'error': 'unknown command'
            })

    async def disconnect(self, code):
        await database_sync_to_async(self.change_user_status)(False)

    async def send_notification(self, action):
        await self.send_json(action)

    async def subscribe(self, article_id):
        await self.channel_layer.group_add(
            'article_{0}'.format(article_id),
            self.channel_name
        )

    async def unsubscribe(self, article_id):
        await self.channel_layer.group_discard(
            'article_{0}'.format(article_id),
            self.channel_name
        )

    def get_users_subscription(self):
        return CommentSubscribe.objects.filter(
            user=self.scope['user']
        )

    def change_user_status(self, online):
        return UserStatus.objects.filter(
            user=self.scope['user']
        ).update(online=online)

views.py

from .notify import send_comment_notification

class CreateComment(CreateView):

    ...

    def form_valid(self, form):
        ...
        super().form_valid(form)
        send_comment_notification('create', article_id)


class UpdateComment(UpdateView):

    ...

    def form_valid(self, form):
        ...
        super().form_valid(form)
        send_comment_notification('update', article_id)


class DeleteComment(DeleteView):

    ...

    def delete(self, request, *args, **kwargs):
        ...
        send_comment_notification('delete', article_id)

notify.py

...

def send_comment_notification(action, article_id):
    channel_layer = get_channel_layer()
    group_name = 'article_{0}'.format(article_id)
    async_to_sync(channel_layer.group_send)(
        group_name,
        {
            'type': 'send.notification',
            'data': {
                'action': action
            }
        }
    )


Get this bounty!!!

#StackBounty: #c# #networking #async-await #task-parallel-library #tcp Socket application using TPL

Bounty: 50

This is an application I wrote that allows multiple TCP clients to share a single TCP connection to a remote server (hosted project, and a demo). Traffic generated by the server is forwarded to all clients. Traffic generated from any client is forwarded to the server. I don’t concern the case where client traffics are interleaved, as in my domain traffics are spontaneous and it’s affordable to have interleaved, invalid data.

The Goal

The goal for the code review is to examine if I am idiomatic on .NET socket programming and TPL programming, if I correctly handled task executions and cancellations, and if there is any performance concerns, e.g., unnecessarily blocking the socket communication, etc.

High Level Design

The project is hosted here. Feel free to read the README. Implementation wise, I designed it to be as responsive as possible – no socket connection is unnecessarily blocked.

  • There is an outbound queue. Each outbound packet from each client is put into this queue.
  • There is an inbound queue for each client. Each inbound packet is put into each of these queues.

For each client, there’re two async tasks, one for dequeueing from the client’s inbound queue (this can block when the queue is empty, which is desirable) and write the data to the client socket; one for reading data from the client socket and enqueueing to the outbound queue.

Similarly there’re two async tasks for the remote connection, one for dequeueing the outbound queue and write to the remote socket; one for reading from the remote socket and enqueueing to each client’s inbound queue.

There’s another async task accepting commands from stdin for connecting/disconnecting to the server, dumping diagnostics info, etc. But that’s not the main focus.


The entire code base is hosted here, but the most relevant classes are listed below.

Program.cs

namespace Multiplexer
{
    using System.Threading.Tasks;

    class Program
    {
        static void Main(string[] args)
        {
            var glob = new Global();
            var ctrl = new ControlChannel(glob);
            var clientServer = new ClientServer(3333, glob);

            Task.WaitAny(
                Task.Run(() => ctrl.Run(), glob.CancellationToken),
                Task.Run(() => clientServer.Run(), glob.CancellationToken));
        }
    }
}

ClientServer.cs

namespace Multiplexer
{
    using System;
    using System.Net;
    using System.Net.Sockets;
    using System.Threading.Tasks;
    /// <summary>
    /// Listens to connection requests and manages client connections
    /// </summary>
    class ClientServer
    {
        /// <summary>
        /// Port to listen on for clients connections
        /// </summary>
        readonly int port;
        Global glob;

        public ClientServer(
            int port, 
            Global glob)
        {
            this.port = port;
            this.glob = glob;
        }

        /// <summary>
        /// Continuously listening for client connection requests
        /// </summary>
        public async Task Run()
        {
            var localserver = new TcpListener(IPAddress.Parse("127.0.0.1"), port);
            localserver.Start();

            while (true)
            {
                Console.WriteLine("Waiting for clients to connect...");

                // AcceptTcpClientAsync() does not accept a cancellation token. But it's OK since
                // in no case would I want the client listener loop to stop running during the entire
                // multiplexer lifecycle. For the sake of completeness, if it is necessary to cancel
                // this operation, one could use CancellationToken.Register(localserver.Stop).
                // See: http://stackoverflow.com/a/30856169/695964
                var client = await localserver.AcceptTcpClientAsync();

                var clientWrapper = new Client(client, glob.CancellationToken, Upload);
                Console.WriteLine($"Client connected: {clientWrapper}");

                // Register client
                glob.Clients[clientWrapper] = 0;

                // Unregister client when it is terminating
                clientWrapper.OnClose = () =>
                {
                    Console.WriteLine($"Removing client from clients list: {clientWrapper}");
                    byte c;
                    glob.Clients.TryRemove(clientWrapper, out c);
                };

                // Start the client. This is fire-and-forget. We don't want to await on it. I
                // t's OK because Start() has necessary logic to handle client termination and disposal.
                var tsk = clientWrapper.Start();
            }
        }

        /// <summary>
        /// Implementation of upload delegate to be called when there's data to upload to remote server
        /// </summary>
        /// <param name="data">the outbound data</param>
        void Upload(byte[] data)
        {
            // Do not enqueue data if remote is not connected (drop it)
            if (glob.Remote.Connected)
            {
                glob.UploadQueue.TryAdd(data);
            }
        }
    }
}

ControlChannel.cs

namespace Multiplexer
{
    using Newtonsoft.Json;
    using System;
    using System.Linq;
    using System.Net.Sockets;
    using System.Threading.Tasks;

    /// <summary>
    /// Class to handle multiplexer management commands.
    /// </summary>
    class ControlChannel
    {
        Global glob;
        public ControlChannel(Global glob)
        {
            this.glob = glob;
        }

        public void Run()
        {
            while (true)
            {
                var line = Console.ReadLine();

                var toks = line.Split();
                switch (toks[0])
                {
                    // Connecting to remote server
                    // connect <remote_server> <port>
                    case "connect":
                        if (glob.Remote.Connected)
                        {
                            Console.WriteLine("Error: Remote is connected. Try disconnect first");
                        }
                        else
                        {
                            // Reset the upload queue so stale outbound data is not uploaded to the new
                            // connection
                            glob.ResetUploadQueue();

                            // This is a fire-and-forget task. It is the responsibility of
                            // the task to properly handle resource clean up.
                            Task.Run(() => StartServer(toks[1], int.Parse(toks[2])));
                        }
                        break;

                    // Disconnecting:
                    // disconnect
                    case "disconnect":
                        if (glob.Remote.Connected)
                        {
                            Console.WriteLine("Disconnecting");

                            // cancel the global cancellation token. This would disconnect the server and all 
                            // clients. It is desirable to disconnect the clients to maintain the equivelency 
                            // when a client is directly connected to the server.
                            glob.Cancel();
                        }
                        else
                        {
                            Console.WriteLine("Not connected");
                        }
                        break;

                    // Dump multiplexer status
                    // info|stats
                    case "info":
                    case "stats":
                        DumpStats();
                        break;

                    // Quit the multiplexer application
                    case "quit":
                        Console.WriteLine("Exiting...");
                        glob.Cancel();
                        return; // terminate the control channel loop

                    default:
                        Console.WriteLine("Unknown command: " + line);
                        break;
                }
            }
        }

        /// <summary>
        /// Dump multiplxer info
        /// </summary>
        private void DumpStats()
        {
            var info = new
            {
                Remote = glob.Remote,
                Clients = glob.Clients.Select(c => c.ToString()).ToArray(),
                UploadQueue = glob.UploadQueue.Select(msg => System.Text.Encoding.UTF8.GetString(msg)).ToArray(),
            };
            Console.WriteLine(JsonConvert.SerializeObject(info, Formatting.Indented));
        }

        /// <summary>
        /// Start the remote connection
        /// </summary>
        /// <param name="hostname">remote hostname</param>
        /// <param name="port">remote port</param>
        async Task StartServer(string hostname, int port)
        {
            Console.WriteLine($"Connecting to {hostname}:{port}");
            var remote = new TcpClient(hostname, port);
            var server = new Remote(remote, glob.UploadQueue, glob.CancellationToken, /* receive: */ data =>
            {
                // Implementation of receive() is to put inbound data to each of the client queues.
                // Note that this is non-blocking. If any queue is full, the data is dropped from that
                // queue.
                foreach (var client in glob.Clients)
                {
                    client.Key.DownlinkQueue.TryAdd(data);
                }
            });

            // Register the remote connection globally so everyone is aware of the connection status. This is 
            // essential for several requirements, e.g., client data should not be added to outbound queue if
            // there's no connection.
            glob.RegisterRemote(server);

            try
            {
                // Start and wait for the remote connection to terminate
                await server.Start();
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
            }
            finally
            {
                Console.WriteLine($"Disposing remote connection: {server}");
                server.Dispose();
                server = null;

                // When remote connection is terminated. Also disconnects all the clients.
                glob.Cancel();
            }
        }
    }
}

Remote.cs

namespace Multiplexer
{
    using System;
    using System.Collections.Concurrent;
    using System.Linq;
    using System.Net.Sockets;
    using System.Threading;
    using System.Threading.Tasks;

    /// <summary>
    /// An interface to expose read-only remote connection information
    /// </summary>
    public interface IRemoteInfo
    {
        /// <summary>
        /// Whether connected to the remote service
        /// </summary>
        bool Connected { get; }

        /// <summary>
        /// Remote address
        /// </summary>
        string RemoteAddress { get; }
    }

    /// <summary>
    /// Class to manage connection to the remote server
    /// </summary>
    class Remote : IDisposable, IRemoteInfo
    {
        readonly TcpClient client;
        readonly NetworkStream stream;

        /// <summary>
        /// A delegate called on receiving a package. Implementation could be submitting the package to the queue.
        /// </summary>
        readonly Action<byte[]> receive;

        /// <summary>
        /// Queue for data to be uploaded to remote server
        /// </summary>
        readonly BlockingCollection<byte[]> uplinkQueue;

        /// <summary>
        /// a linked cancellation token that is cancelled when:
        ///   - external cancellation is requested, or
        ///   - the token of the linked CTS is cancelled
        /// Note that the cancellation of the linked source won't propagate to the external token
        /// </summary>
        readonly CancellationTokenSource linkedCTS;

        public bool Connected => client.Connected;
        public string RemoteAddress => $"{client.Client.RemoteEndPoint}";

        public Remote(
            TcpClient client,
            BlockingCollection<byte[]> uplinkQueue,
            CancellationToken externalCancellationToken, 
            Action<byte[]> receive)
        {
            this.client = client;
            this.uplinkQueue = uplinkQueue;
            this.receive = receive;
            linkedCTS = CancellationTokenSource.CreateLinkedTokenSource(externalCancellationToken);
            stream = client.GetStream();
        }

        /// <summary>
        /// Async task to handle downlink (remote -> multiplexer) traffic
        /// 
        /// This is to read from the socket and put data into the downlink queue (via receive())
        /// </summary>
        async Task HandleDownlink()
        {
            linkedCTS.Token.ThrowIfCancellationRequested();
            int c;
            byte[] buffer = new byte[256];
            while ((c = await stream.ReadAsync(buffer, 0, buffer.Length, linkedCTS.Token)) > 0)
            {
                // Receive is non-blocking
                receive(buffer.Take(c).ToArray());
            }
        }

        /// <summary>
        /// Async task to handle uplink (multiplexer -> remote) traffic
        /// 
        /// This is to take data from the uplink queue and write into the socket.
        /// </summary>
        async Task HandleUplink()
        {
            linkedCTS.Token.ThrowIfCancellationRequested();
            byte[] data;

            // Taking from the queue can be blocked if there's nothing in the queue for consumption
            while (null != (data = uplinkQueue.Take(linkedCTS.Token)))
            {
                await stream.WriteAsync(data, 0, data.Length, linkedCTS.Token);
            }
        }

        /// <summary>
        /// Async task to start and wait for the uplink and downlink handlers
        /// </summary>
        public async Task Start()
        {
            try
            {
                var downlinkTask = Task.Run(HandleDownlink, linkedCTS.Token);
                var uplinkTask = Task.Run(HandleUplink, linkedCTS.Token);

                // If either task returns, the connection is considered to be terminated.
                await await Task.WhenAny(downlinkTask, uplinkTask);
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
            }
            finally
            {
                // Cancel the other task (uplink or downlink)
                linkedCTS.Cancel();
                Console.WriteLine("Remote connection exited.");
                this.Dispose();
            }
        }

        public void Dispose()
        {
            Console.WriteLine("Disposing of remote connection");
            linkedCTS.Dispose();
            stream.Dispose();
            client.Close();
        }
    }
}

Client.cs

namespace Multiplexer
{
    using System;
    using System.Collections.Concurrent;
    using System.Linq;
    using System.Net.Sockets;
    using System.Threading;
    using System.Threading.Tasks;

    /// <summary>
    /// A class to handle local client connections (client - multiplexer)
    /// </summary>
    class Client : IDisposable
    {
        readonly TcpClient client;

        /// <summary>
        /// A delegate to invoke when receiving data from the client socket that should be uploaded.
        /// 
        /// Implementation should put this to the outbound queue. This should be non-blocking.
        /// </summary>
        readonly Action<byte[]> upload;
        readonly NetworkStream stream;

        /// <summary>
        /// A queue containing data from remote server that should be delivered to this client
        /// </summary>
        readonly BlockingCollection<byte[]> downlinkQueue = new BlockingCollection<byte[]>();

        /// <summary>
        /// A cancellation token source linked with an external token
        /// </summary>
        readonly CancellationTokenSource cts;

        public BlockingCollection<byte[]> DownlinkQueue
        {
            get
            {
                return this.downlinkQueue;
            }
        }

        /// <summary>
        /// A delegate to be called when the client is closed. The <see cref="ClientServer"/> uses this to 
        /// properly remove the client from the clients list.
        /// </summary>
        public Action OnClose { get; set; }

        public Client(TcpClient client, CancellationToken externalCancellationToken, Action<byte[]> upload)
        {
            this.client = client;
            this.upload = upload;
            this.cts = CancellationTokenSource.CreateLinkedTokenSource(externalCancellationToken);
            this.stream = client.GetStream();
        }

        /// <summary>
        /// Start the client traffic
        /// </summary>
        public async Task Start()
        {
            var uplinkTask = Task.Run(HandleUplink, cts.Token);
            var downlinkTask = Task.Run(HandleDownlink, cts.Token);

            try
            {
                // Await for either of the downlink or uplink task to finish
                await await Task.WhenAny(uplinkTask, downlinkTask);
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
            }
            finally
            {
                // Cancel the other task (uplink or downlink)
                cts.Cancel();
                Console.WriteLine("Client closing");
                Dispose();
            }
        }

        /// <summary>
        /// Handle uplink traffic (client -> multiplexer -> remote)
        /// </summary>
        async Task HandleUplink()
        {
            cts.Token.ThrowIfCancellationRequested();
            int c;
            byte[] buffer = new byte[256];
            while ((c = await stream.ReadAsync(buffer, 0, buffer.Length, cts.Token)) > 0)
            {
                upload(buffer.Take(c).ToArray());
            }
        }

        /// <summary>
        /// Handle downlink traffic (remote -> multiplexer -> client)
        /// </summary>
        async Task HandleDownlink()
        {
            cts.Token.ThrowIfCancellationRequested();
            byte[] data;

            // This would block if the downlink queue is empty
            while (null != (data = downlinkQueue.Take(cts.Token)))
            {
                await stream.WriteAsync(data, 0, data.Length, cts.Token);
            }
        }

        public override string ToString()
        {
            return client.Client.RemoteEndPoint.ToString();
        }

        public void Dispose()
        {
            Console.WriteLine($"Disposing of client: {this}");
            OnClose();
            cts.Dispose();
            stream.Dispose();
            client.Close();
        }
    }
}

Global.cs

namespace Multiplexer
{
    using System;
    using System.Collections.Concurrent;
    using System.Threading;

    /// <summary>
    /// A class to hold common dependencies to other classes
    /// </summary>
    /// <remarks>
    /// This used to be a singleton and referenced directly by other code, hence the name "Global".
    /// I changed it to be dependencies passed as constructor parameters of other classes so it's 
    /// easier to write tests.
    /// </remarks>
    class Global
    {
        /// <summary>
        /// Queue for data to be uploaded to remote server (client -> remote)
        /// </summary>
        public BlockingCollection<byte[]> UploadQueue => uploadQueue;

        /// <summary>
        /// Set of connected clients. This is used as a set (only keys are used), but there's no ConcurrentSet.
        /// </summary>
        public ConcurrentDictionary<Client, byte> Clients => clients;

        /// <summary>
        /// A cancellation token for disconnection. This is used to cancel the remote connection and client connections.
        /// </summary>
        public CancellationToken CancellationToken => cts.Token;

        /// <summary>
        /// A readonly object holding status for the remote connection
        /// </summary>
        public IRemoteInfo Remote => remote ?? new DummyRemote();

        private BlockingCollection<byte[]> uploadQueue = new BlockingCollection<byte[]>();
        private ConcurrentDictionary<Client, byte> clients = new ConcurrentDictionary<Client, byte>();
        private CancellationTokenSource cts = new CancellationTokenSource();
        private IRemoteInfo remote;

        /// <summary>
        /// Register remote connection
        /// </summary>
        public IRemoteInfo RegisterRemote(IRemoteInfo remote)
        {
            return Interlocked.Exchange(ref this.remote, remote);
        }

        /// <summary>
        /// Cancel the global cancellation token. Used for disconnecting remote and clients
        /// </summary>
        public void Cancel()
        {
            if (!cts.Token.IsCancellationRequested)
            {
                lock (cts)
                {
                    if (!cts.Token.IsCancellationRequested)
                    {
                        cts.Cancel();
                        cts.Dispose();
                        cts = new CancellationTokenSource();
                    }
                }
            }
        }

        /// <summary>
        /// Clear the upload queue. Used at each new remote connection.
        /// </summary>
        public void ResetUploadQueue()
        {
            Console.WriteLine($"Resetting upload queue ({uploadQueue.Count})");
            byte[] b;
            while (uploadQueue.TryTake(out b)) { }
        }

        /// <summary>
        /// A dummy remote info to avoid null referencing when no remote server is connected
        /// </summary>
        class DummyRemote : IRemoteInfo
        {
            public bool Connected => false;
            public string RemoteAddress => "";
        }
    }
}


Get this bounty!!!

#StackBounty: #javascript #error-handling #async-await #try-catch #stack-trace Passing an async function as a callback causes the error…

Bounty: 200

I’m trying to write a function that will reintroduce a stack trace when an object literal is thrown. (See this related question).

What I’ve noticed is that if a pass an async function as a callback into another async caller function, if the caller function has a try/catch, and catches any errors, and throws a new Error, then the stack trace gets lost.

I’ve tried several variants of this:

function alpha() {
  throw Error("I am an error!");
}

function alphaObectLiberal() {
  throw "I am an object literal!";  //Ordinarily this will cause the stack trace to be lost. 
}

function syncFunctionCaller(fn) {
  return fn();
}

function syncFunctionCaller2(fn) { //This wrapper wraps it in a proper error and subsequently preserves the stack trace. 
  try {
    return fn();
  } catch (err) {
    throw new Error(err); //Stack trace is preserved when it is synchronous. 
  }
}


async function asyncAlpha() {
  throw Error("I am also an error!"); //Stack trace is preseved if a proper error is thown from callback
}

async function asyncAlphaObjectLiteral() {
  throw "I am an object literal!"; //I want to catch this, and convert it to a proper Error object. 
}

async function asyncFunctionCaller(fn) {
  return await fn();
}

async function asyncFunctionCaller2(fn) {
  try {
    await fn();
  } catch (err) {
    throw new Error(err);
  }
}

async function asyncFunctionCaller3(fn) {
  try {
    await fn();
  } catch (err) {
    throw new Error("I'm an error thrown from the function caller!");
  }
}

async function asyncFunctionCaller4(fn) {
  throw new Error("No try catch here!");
}

async function everything() {
  try {
    syncFunctionCaller(alpha);
  } catch (err) {
    console.log(err);
  }


  try {
    syncFunctionCaller2(alphaObectLiberal);
  } catch (err) {
    console.log(err);
  }

  try {
    await asyncFunctionCaller(asyncAlpha);
  } catch (err) {
    console.log(err);
  }

  try {
    await asyncFunctionCaller2(asyncAlphaObjectLiteral);
  } catch (err) {
    console.log(err); //We've lost the `everthing` line number from the stack trace
  }

  try {
    await asyncFunctionCaller3(asyncAlphaObjectLiteral);
  } catch (err) {
    console.log(err); //We've lost the `everthing` line number from the stack trace
  }

  try {
    await asyncFunctionCaller4(asyncAlphaObjectLiteral);
  } catch (err) {
    console.log(err); //This one is fine
  }
}

everything();

(Code Sandbox)

Output: note my comments in the stack trace

[nodemon] starting `node src/index.js localhost 8080`
Error: I am an error!
    at alpha (/sandbox/src/index.js:2:9)
    at syncFunctionCaller (/sandbox/src/index.js:6:10)
    at everything (/sandbox/src/index.js:43:5) 
    //We can see what function caused this error
    at Object.<anonymous> (/sandbox/src/index.js:73:1)
    at Module._compile (internal/modules/cjs/loader.js:776:30)
    at Object.Module._extensions..js (internal/modules/cjs/loader.js:787:10)
    at Module.load (internal/modules/cjs/loader.js:653:32)
    at tryModuleLoad (internal/modules/cjs/loader.js:593:12)
    at Function.Module._load (internal/modules/cjs/loader.js:585:3)
    at Function.Module.runMain (internal/modules/cjs/loader.js:829:12)
Error: I am an object literal!
    at syncFunctionCaller2 (/sandbox/src/index.js:17:11)
    at everything (/sandbox/src/index.js:65:5)
    //In a synchronous wrapper, the stack trace is preserved
    at Object.<anonymous> (/sandbox/src/index.js:95:1)
    at Module._compile (internal/modules/cjs/loader.js:776:30)
    at Object.Module._extensions..js (internal/modules/cjs/loader.js:787:10)
    at Module.load (internal/modules/cjs/loader.js:653:32)
    at tryModuleLoad (internal/modules/cjs/loader.js:593:12)
    at Function.Module._load (internal/modules/cjs/loader.js:585:3)
    at Function.Module.runMain (internal/modules/cjs/loader.js:829:12)
    at startup (internal/bootstrap/node.js:283:19)
Error: I am also an error!
    at asyncAlpha (/sandbox/src/index.js:10:9)
    at asyncFunctionCaller (/sandbox/src/index.js:18:16)
    at everything (/sandbox/src/index.js:49:11) 
    //We can see what function caused this error
    at Object.<anonymous> (/sandbox/src/index.js:73:1)
    at Module._compile (internal/modules/cjs/loader.js:776:30)
    at Object.Module._extensions..js (internal/modules/cjs/loader.js:787:10)
    at Module.load (internal/modules/cjs/loader.js:653:32)
    at tryModuleLoad (internal/modules/cjs/loader.js:593:12)
    at Function.Module._load (internal/modules/cjs/loader.js:585:3)
    at Function.Module.runMain (internal/modules/cjs/loader.js:829:12)
Error: I am an object literal!
    at asyncFunctionCaller2 (/sandbox/src/index.js:25:11) 
   //We've lost the stacktrace in `everything`
    at process._tickCallback (internal/process/next_tick.js:68:7)
    at Function.Module.runMain (internal/modules/cjs/loader.js:832:11)
    at startup (internal/bootstrap/node.js:283:19)
    at bootstrapNodeJSCore (internal/bootstrap/node.js:622:3)
Error: I'm an error thrown from the function caller!
    at asyncFunctionCaller3 (/sandbox/src/index.js:33:11)
    //We've lost the stacktrace in `everything`
    at process._tickCallback (internal/process/next_tick.js:68:7)
    at Function.Module.runMain (internal/modules/cjs/loader.js:832:11)
    at startup (internal/bootstrap/node.js:283:19)
    at bootstrapNodeJSCore (internal/bootstrap/node.js:622:3)
Error: No try catch here!
    at asyncFunctionCaller4 (/sandbox/src/index.js:38:9)
    at everything (/sandbox/src/index.js:67:11)
    //We can see what function caused this error
    at process._tickCallback (internal/process/next_tick.js:68:7)
    at Function.Module.runMain (internal/modules/cjs/loader.js:832:11)
    at startup (internal/bootstrap/node.js:283:19)
    at bootstrapNodeJSCore (internal/bootstrap/node.js:622:3)
[nodemon] clean exit - waiting for changes before restart

It seems to me that the await statement is what is screwing this up.

What’s going on here?


Get this bounty!!!