admin管理员组

文章数量:1399474

I am using socket.io to municate the swift client of my app with the server. Essentially, the client joins a socket connection upon opening the app and a job is instantly added to a Redis queue (it's a job that takes anywhere from a few seconds to like 15ish seconds). There's a response from the server to the client of the job id. While this job is processing, SOMETIMES the client will disconnect. There doesn't seem to be a rhyme or reason behind this, as the time of disconnection is totally inconsistent and it's also not like the disconnection is happening at a specific point in the function. I thought maybe I was manually disconnecting from the client side so I set up socket emissions right before each disconnect on the client side (when these emissions were emitted to the server, the server prints something that tells me where the disconnect came from). This showed me that the disconnect is automatic, because the emission is never received by the client before ending the socket connection. This is running on Heroku. Here's my code:

//queue initialization
const queue = new Queue('queue', process.env.REDIS_URL)

//client pings this endpoint to get the job id in the queue
app.post('/process', async function(request, response) {
  let job = await queue({request: request.body});
  console.log("Logging job as " + job.id)
  response.json({ id: job.id });
});

queue.process(10, async (job) => { //10 is the max workers per job
    console.log("Started processing")
    const client = await pool.connect()
    let item = job.data.request
    let title = item.title
    let subtitle = item.subtitle
    let id = item.id
    io.to(id).emit("Processing1", ""); //added emissions like these because I thought maybe the socket was timing out, but this didn't help
    console.log("Processing1");

    try {
      await client.query('BEGIN')
        let geoData = await //promise of geocoding endpoint api function
        let lengthOfGeoData = geoData.context.length
        io.to(id).emit("Processing2", "");
        console.log("Processing2");
        var municipality = ""
        var area = ""
        var locality = ""
        var place = ""
        var district = ""
        var region = ""
        var country = ""
        //for loop to go through geoData and set the above values
      if (municipality != "") {
        console.log("Signing in from " + municipality + ", " + area);
      } else {
        console.log("Signing in from " + area)
      }
      await scrape(municipality, area, id);
      await client.query('COMMIT')
    } catch(err) {
      await client.query('ROLLBACK')
      console.log(err)
    }
    try {
      await client.query('BEGIN')
      const array = await //a function that queries a Postgres db for some rows, makes json objects out of them, and pushes to the 'array' variable
      var array2 = []
      for (a of array) {
        let difference = getDifference(title, subtitle, a.title, a.subtitle) //math function
        if (difference <= 10) {
          array.push(a)
        }
      }
      io.to(id).emit("Processing9", "");
      console.log("Processing9");
      await client.query('COMMIT')
    } catch(err) {
      await client.query('ROLLBACK')
      console.log("ERROR: Failed arrayHelperFunction")
      console.log(err)
    } finally {
      client.release()
      console.log("About to emit this ish to " + id) //should emit to socket here ideally to notify that the processing is done and results can be polled
      io.to(id).emit("finishedLoading", "")
      return array2;
    }
});

//when the client polls the queue after it's received the 'done' notifier from the server
app.post('/poll', async function(request, response) {
  console.log("Polling")
  let id = request.body.id
  const results = await queue(id);
  for (r of results.returnvalue) {
    console.log("Sending " + r.title);
  }
  response.send(results.returnvalue)
});

//scrape
async function scrape(municipality, area, id) {
  const client = await pool.connect();
  try {
    await client.query('BEGIN')
    var location = ""
    if (municipality != "") {
      location = municipality + ", " + area
    } else {
      location = area
    }
    let inDatabase = await client.query('SQL statement AS it_does_exist', [params]);
    io.to(id).emit("Processing3", "");
    console.log("Processing3");
    if (inDatabase.rows[0].it_does_exist == false) { 
      let query = "book clubs near " + location
      var terminationTime = new Date()
      terminationTime.setHours(terminationTime.getHours() + 4);
      let date = ("0" + terminationTime.getDate()).slice(-2);
      let month = ("0" + (terminationTime.getMonth() + 1)).slice(-2);
      let year = terminationTime.getFullYear();
      let hours = terminationTime.getHours();
      let minutes = terminationTime.getMinutes();
      let seconds = terminationTime.getSeconds();
      let timestamp = year + "-" + month + "-" + date + " " + hours + ":" + minutes + ":" + seconds

      try {
        await client.query(`SQL statement`, [params]);
      } catch(err) {
        console.log("FAILURE: scrape() at 1.")
        console.log(err)
      }

      var queryLocation = "New York,New York,United States" //default search origination is here
      var queryGLCode = "US"
      io.to(id).emit("Processing4", "");
      console.log("Processing4");
      try {
        await fetch('.json?q='+municipality+'&limit=10', { method : "GET" })
          .then(res => res.json())
          .then((json) => {
            for (let index = 0; index < 10; index++) {
              let locationAPIName = json[index].canonical_name
              let locationAPICode = json[index].country_code
              let resultLatitude = json[index].gps[1];
              let resultLongitude = json[index].gps[0];
            }
          });
      } catch(err) {
        console.log("FAILURE: scrape() at 2.")
        console.log(err)
      }
      io.to(id).emit("Processing5", "");
      console.log("Processing5");
      try {
        await Promise.all([
          searchEvents({engine: "google_events", q: query, location: queryLocation, hl: "en", gl: queryGLCode}).then(data => async function(){
            try {
              await client.query('BEGIN');
              let results = data.events_results
              if (results != null) {
                console.log("first HAD results")
                for (result of results) {
                  var fixedAddress = result.address[0]
                  let address = fixedAddress + ", " + result.address[1]
                      
                  let title = result.title + address

                  var description = result.description

                  let geoData = await geocode(address); //mapbox geocode the address
                  let latitude = Number(geoData.center[0]);
                  let longitude = Number(geoData.center[1]);
                  
                    await client.query(`SQL statement`, [params]);
                  
                }
                io.to(id).emit("Processing6", "");
                console.log("Processing6");
              } else {
                console.log("first DID NOT have results")
              }
              console.log("FIRST BLOCK")
              await client.query('COMMIT');
            } catch(err) {
              console.log("Results[0] not found.")
              console.log(err)
              await client.query('ROLLBACK');
            }
          }()),

          searchEvents({engine: "google_events", q: query, location: queryLocation, hl: "en", gl: queryGLCode, start: "10"}).then(data => async function(){
            // same as the one above, just with an offset
          }()),

          searchEvents({engine: "google_events", q: query, location: queryLocation, hl: "en", gl: queryGLCode, start: "20"}).then(data => async function(){
            // same as the one above, but with a different offset
          }())
        ])
      } catch(err) {
        console.log("FAILURE: scrape() at 3.")
        console.log(err)
      }

    } else {
      console.log("Location already in the database.")
    }
    await client.query('COMMIT')
  } catch(err) {
    await client.query('ROLLBACK')
    console.log(err)
  } finally {
    client.release()
    return "Resolved";
  }
}

//Client establish socket connection
func establishConnection(_ pletion: (() -> Void)? = nil) {
    let socketUrlString: String = appState.server
    self.manager = SocketManager(socketURL: URL(string: socketUrlString)!, config: [.log(false), .reconnects(true), .extraHeaders(["header": "customheader"])])
    self.socket = manager?.defaultSocket
    self.socket?.connect()
    self.socket?.once(clientEvent: .connect, callback: { (data, emitter) in
        if pletion != nil{
            pletion!()
        }
    })
  //other socket functions
}

//Client initial post request
func process() {
    let server = "serverstring" + "process"
    let title = "title"
    let subtitle = "subtitle"
    let package = BookPackage(title: title, subtitle: subtitle, id: mySocketID) //this is after the initial connection
    print("package is \(package)")
            
    guard let url  = URL(string: server) else { return }

    var urlRequest = URLRequest(url: url)
    
    urlRequest.addValue("application/json", forHTTPHeaderField: "Content-Type")
    urlRequest.addValue("application/json", forHTTPHeaderField: "Accept")
    
    urlRequest.httpMethod = "POST"
    
    guard let data = try? JSONEncoder().encode(package) else { return }
            
    urlRequest.httpBody = data

    let task = URLSession.shared.dataTask(with: urlRequest) {
        (data, response, error) in
        if let error = error {
            print(error)
            return
        }
        guard let data = data else { return }
        guard let dataString = String(data: data, encoding: String.Encoding.utf8) else { return }
        let jsonData = Data(dataString.utf8)
        var decodedJob: Job? = nil
        do {
            decodedJob = try JSONDecoder().decode(Job.self, from: jsonData) //Job is just a struct in the same form as the json object sent back from the server
        } catch {
            print(error.localizedDescription)
        }
        DispatchQueue.main.async {
            self.appState.pendingJob = decodedJob
        }
    }
    // start the task
    task.resume()
}

The only consistent part of this bug is the logs right before the user disconnects (side note: 'reason of disconnect' and 'DISCONNECTED USER' fire on the socket.on('disconnect') event:

.png

.png

.png

.png

I am using socket.io to municate the swift client of my app with the server. Essentially, the client joins a socket connection upon opening the app and a job is instantly added to a Redis queue (it's a job that takes anywhere from a few seconds to like 15ish seconds). There's a response from the server to the client of the job id. While this job is processing, SOMETIMES the client will disconnect. There doesn't seem to be a rhyme or reason behind this, as the time of disconnection is totally inconsistent and it's also not like the disconnection is happening at a specific point in the function. I thought maybe I was manually disconnecting from the client side so I set up socket emissions right before each disconnect on the client side (when these emissions were emitted to the server, the server prints something that tells me where the disconnect came from). This showed me that the disconnect is automatic, because the emission is never received by the client before ending the socket connection. This is running on Heroku. Here's my code:

//queue initialization
const queue = new Queue('queue', process.env.REDIS_URL)

//client pings this endpoint to get the job id in the queue
app.post('/process', async function(request, response) {
  let job = await queue({request: request.body});
  console.log("Logging job as " + job.id)
  response.json({ id: job.id });
});

queue.process(10, async (job) => { //10 is the max workers per job
    console.log("Started processing")
    const client = await pool.connect()
    let item = job.data.request
    let title = item.title
    let subtitle = item.subtitle
    let id = item.id
    io.to(id).emit("Processing1", ""); //added emissions like these because I thought maybe the socket was timing out, but this didn't help
    console.log("Processing1");

    try {
      await client.query('BEGIN')
        let geoData = await //promise of geocoding endpoint api function
        let lengthOfGeoData = geoData.context.length
        io.to(id).emit("Processing2", "");
        console.log("Processing2");
        var municipality = ""
        var area = ""
        var locality = ""
        var place = ""
        var district = ""
        var region = ""
        var country = ""
        //for loop to go through geoData and set the above values
      if (municipality != "") {
        console.log("Signing in from " + municipality + ", " + area);
      } else {
        console.log("Signing in from " + area)
      }
      await scrape(municipality, area, id);
      await client.query('COMMIT')
    } catch(err) {
      await client.query('ROLLBACK')
      console.log(err)
    }
    try {
      await client.query('BEGIN')
      const array = await //a function that queries a Postgres db for some rows, makes json objects out of them, and pushes to the 'array' variable
      var array2 = []
      for (a of array) {
        let difference = getDifference(title, subtitle, a.title, a.subtitle) //math function
        if (difference <= 10) {
          array.push(a)
        }
      }
      io.to(id).emit("Processing9", "");
      console.log("Processing9");
      await client.query('COMMIT')
    } catch(err) {
      await client.query('ROLLBACK')
      console.log("ERROR: Failed arrayHelperFunction")
      console.log(err)
    } finally {
      client.release()
      console.log("About to emit this ish to " + id) //should emit to socket here ideally to notify that the processing is done and results can be polled
      io.to(id).emit("finishedLoading", "")
      return array2;
    }
});

//when the client polls the queue after it's received the 'done' notifier from the server
app.post('/poll', async function(request, response) {
  console.log("Polling")
  let id = request.body.id
  const results = await queue(id);
  for (r of results.returnvalue) {
    console.log("Sending " + r.title);
  }
  response.send(results.returnvalue)
});

//scrape
async function scrape(municipality, area, id) {
  const client = await pool.connect();
  try {
    await client.query('BEGIN')
    var location = ""
    if (municipality != "") {
      location = municipality + ", " + area
    } else {
      location = area
    }
    let inDatabase = await client.query('SQL statement AS it_does_exist', [params]);
    io.to(id).emit("Processing3", "");
    console.log("Processing3");
    if (inDatabase.rows[0].it_does_exist == false) { 
      let query = "book clubs near " + location
      var terminationTime = new Date()
      terminationTime.setHours(terminationTime.getHours() + 4);
      let date = ("0" + terminationTime.getDate()).slice(-2);
      let month = ("0" + (terminationTime.getMonth() + 1)).slice(-2);
      let year = terminationTime.getFullYear();
      let hours = terminationTime.getHours();
      let minutes = terminationTime.getMinutes();
      let seconds = terminationTime.getSeconds();
      let timestamp = year + "-" + month + "-" + date + " " + hours + ":" + minutes + ":" + seconds

      try {
        await client.query(`SQL statement`, [params]);
      } catch(err) {
        console.log("FAILURE: scrape() at 1.")
        console.log(err)
      }

      var queryLocation = "New York,New York,United States" //default search origination is here
      var queryGLCode = "US"
      io.to(id).emit("Processing4", "");
      console.log("Processing4");
      try {
        await fetch('https://serpapi./locations.json?q='+municipality+'&limit=10', { method : "GET" })
          .then(res => res.json())
          .then((json) => {
            for (let index = 0; index < 10; index++) {
              let locationAPIName = json[index].canonical_name
              let locationAPICode = json[index].country_code
              let resultLatitude = json[index].gps[1];
              let resultLongitude = json[index].gps[0];
            }
          });
      } catch(err) {
        console.log("FAILURE: scrape() at 2.")
        console.log(err)
      }
      io.to(id).emit("Processing5", "");
      console.log("Processing5");
      try {
        await Promise.all([
          searchEvents({engine: "google_events", q: query, location: queryLocation, hl: "en", gl: queryGLCode}).then(data => async function(){
            try {
              await client.query('BEGIN');
              let results = data.events_results
              if (results != null) {
                console.log("first HAD results")
                for (result of results) {
                  var fixedAddress = result.address[0]
                  let address = fixedAddress + ", " + result.address[1]
                      
                  let title = result.title + address

                  var description = result.description

                  let geoData = await geocode(address); //mapbox geocode the address
                  let latitude = Number(geoData.center[0]);
                  let longitude = Number(geoData.center[1]);
                  
                    await client.query(`SQL statement`, [params]);
                  
                }
                io.to(id).emit("Processing6", "");
                console.log("Processing6");
              } else {
                console.log("first DID NOT have results")
              }
              console.log("FIRST BLOCK")
              await client.query('COMMIT');
            } catch(err) {
              console.log("Results[0] not found.")
              console.log(err)
              await client.query('ROLLBACK');
            }
          }()),

          searchEvents({engine: "google_events", q: query, location: queryLocation, hl: "en", gl: queryGLCode, start: "10"}).then(data => async function(){
            // same as the one above, just with an offset
          }()),

          searchEvents({engine: "google_events", q: query, location: queryLocation, hl: "en", gl: queryGLCode, start: "20"}).then(data => async function(){
            // same as the one above, but with a different offset
          }())
        ])
      } catch(err) {
        console.log("FAILURE: scrape() at 3.")
        console.log(err)
      }

    } else {
      console.log("Location already in the database.")
    }
    await client.query('COMMIT')
  } catch(err) {
    await client.query('ROLLBACK')
    console.log(err)
  } finally {
    client.release()
    return "Resolved";
  }
}

//Client establish socket connection
func establishConnection(_ pletion: (() -> Void)? = nil) {
    let socketUrlString: String = appState.server
    self.manager = SocketManager(socketURL: URL(string: socketUrlString)!, config: [.log(false), .reconnects(true), .extraHeaders(["header": "customheader"])])
    self.socket = manager?.defaultSocket
    self.socket?.connect()
    self.socket?.once(clientEvent: .connect, callback: { (data, emitter) in
        if pletion != nil{
            pletion!()
        }
    })
  //other socket functions
}

//Client initial post request
func process() {
    let server = "serverstring" + "process"
    let title = "title"
    let subtitle = "subtitle"
    let package = BookPackage(title: title, subtitle: subtitle, id: mySocketID) //this is after the initial connection
    print("package is \(package)")
            
    guard let url  = URL(string: server) else { return }

    var urlRequest = URLRequest(url: url)
    
    urlRequest.addValue("application/json", forHTTPHeaderField: "Content-Type")
    urlRequest.addValue("application/json", forHTTPHeaderField: "Accept")
    
    urlRequest.httpMethod = "POST"
    
    guard let data = try? JSONEncoder().encode(package) else { return }
            
    urlRequest.httpBody = data

    let task = URLSession.shared.dataTask(with: urlRequest) {
        (data, response, error) in
        if let error = error {
            print(error)
            return
        }
        guard let data = data else { return }
        guard let dataString = String(data: data, encoding: String.Encoding.utf8) else { return }
        let jsonData = Data(dataString.utf8)
        var decodedJob: Job? = nil
        do {
            decodedJob = try JSONDecoder().decode(Job.self, from: jsonData) //Job is just a struct in the same form as the json object sent back from the server
        } catch {
            print(error.localizedDescription)
        }
        DispatchQueue.main.async {
            self.appState.pendingJob = decodedJob
        }
    }
    // start the task
    task.resume()
}

The only consistent part of this bug is the logs right before the user disconnects (side note: 'reason of disconnect' and 'DISCONNECTED USER' fire on the socket.on('disconnect') event:

https://i.sstatic/7fjuU.png

https://i.sstatic/z5bmL.png

https://i.sstatic/aHNt3.png

https://i.sstatic/64WYI.png

Share Improve this question edited Oct 1, 2021 at 11:57 nickcoding2 asked Sep 25, 2021 at 3:16 nickcoding2nickcoding2 2941 gold badge17 silver badges48 bronze badges 28
  • 1 Is the long function blocking the event queue? – jfriend00 Commented Sep 25, 2021 at 5:26
  • 1 Can you show how the client makes the request that starts all this off? Is it from Javascript or a browser form post? – jfriend00 Commented Sep 25, 2021 at 5:30
  • 1 You're the one who used the term "long function" in the title of your question., Please don't ask me what it is. I'm asking you. – jfriend00 Commented Sep 25, 2021 at 15:27
  • 2 So, the client code runs on an iPhone (since that looks like Swift code)? Perhaps there are power management issues asserted by the iPhone that cause it to drop an inactive socket.io connection? This looks like it's probably a client-side issue. I'd suggest searching for iOS problems with socket.io connections dropping and see if any of the zillions of hits you find looks like your circumstance. – jfriend00 Commented Sep 25, 2021 at 15:35
  • 2 Setting the right environment variables will automatically enable debugging on the server. For the client, it appears you have to set a localStorage value. The idea is that the debugging code is already in socket.io, you just have to enable it with the right settings. You will probably have to restart the server with the right settings for anything to take effect. – jfriend00 Commented Sep 26, 2021 at 14:37
 |  Show 23 more ments

3 Answers 3

Reset to default 2

The solution to your problem is to modify the pingTimeout when initiating the server.

From Socket.io:

The server sends a ping, and if the client does not answer with a pong within pingTimeout ms, the server considers that the connection is closed.

Similarly, if the client does not receive a ping from the server within pingInterval + pingTimeout ms, the client also considers that the connection is closed.

const io = new Server(httpServer, {
  pingTimeout: 30000
});

You should be blocking the event loop with await. There is a heartbeat that the client sends every once in a while (which is defined with pingTimeout).

Since no ping is received by the server, it is disconnected.

You should isolate this process. Either find a way to use it with a worker/background process or async, additionally increasing pingTimeout on serverside might help you.

You can change the transport from the default to:

 const io = new Server(httpServer, {
  transports: ['polling', 'websocket'],
});

This might resolve the issue, else you can also try canging the upgradeTimeout and pingTimeout

本文标签: javascriptsocketio client automatically disconnecting in long Nodejs functionStack Overflow