Chilkat HOME .NET Core C# Android™ AutoIt C C# C++ Chilkat2-Python CkPython Classic ASP DataFlex Delphi ActiveX Delphi DLL Go Java Lianja Mono C# Node.js Objective-C PHP ActiveX PHP Extension Perl PowerBuilder PowerShell PureBasic Ruby SQL Server Swift 2 Swift 3,4,5... Tcl Unicode C Unicode C++ VB.NET VBScript Visual Basic 6.0 Visual FoxPro Xojo Plugin
(CkPython) Firebase Receive Server-Sent Events (text/event-stream)Demonstrates how to start receiving server-sent events and update your JSON database with each event.
import sys import chilkat # Demonstrates how to begin receiving server-sent events, and to update # your JSON database for each event. # This example requires the Chilkat API to have been previously unlocked. # See Global Unlock Sample for sample code. # This example assumes a JWT authentication token, if required, has been previously obtained. # See Get Firebase Access Token from JSON Service Account Private Key for sample code. # Load the previously obtained Firebase access token into a string. fac = chilkat.CkFileAccess() accessToken = fac.readEntireTextFile("qa_data/tokens/firebaseToken.txt","utf-8") if (fac.get_LastMethodSuccess() != True): print(fac.lastErrorText()) sys.exit() rest = chilkat.CkRest() # Make the initial connection (without sending a request yet). # Once connected, any number of requests may be sent. It is not necessary to explicitly # call Connect before each request. success = rest.Connect("chilkat.firebaseio.com",443,True,True) if (success != True): print(rest.lastErrorText()) sys.exit() authGoogle = chilkat.CkAuthGoogle() authGoogle.put_AccessToken(accessToken) rest.SetAuthGoogle(authGoogle) rest.AddHeader("Accept","text/event-stream") rest.AddHeader("Cache-Control","no-cache") responseBody = rest.fullRequestNoBody("GET","/.json") # A 307 redirect response is expected. if (rest.get_ResponseStatusCode() != 307): print("Unexpected response code: " + str(rest.get_ResponseStatusCode())) print(responseBody) print("Failed.") sys.exit() # Get the redirect URL # url is a CkUrl url = rest.RedirectUrl() if (rest.get_LastMethodSuccess() != True): print(rest.lastErrorText()) sys.exit() print("redirect URL domain: " + url.host()) print("redirect URL path: " + url.path()) print("redirect URL query params: " + url.query()) print("redirect URL path with query params: " + url.pathWithQueryParams()) # Our text/event-stream will be obtained from the redirect URL... rest2 = chilkat.CkRest() success = rest2.Connect(url.host(),443,True,True) if (success != True): print(rest2.lastErrorText()) sys.exit() rest2.AddHeader("Accept","text/event-stream") rest2.AddHeader("Cache-Control","no-cache") # Add the redirect query params to the request rest2.AddQueryParams(url.query()) # In our case, we don't actually need the auth query param, # so remove it. rest2.RemoveQueryParam("auth") # Send the request. (We are only sending the request here. # We are not yet getting the response because the response # will be a text/event-stream.) success = rest2.SendReqNoBody("GET",url.path()) if (success != True): print(rest2.lastErrorText()) sys.exit() # Read the response header. # We want to first get the response header to see if it's a successful # response status code. If not, then the response will not be a text/event-stream # and we should read the response body normally. responseStatusCode = rest2.ReadResponseHeader() if (responseStatusCode < 0): print(rest2.lastErrorText()) sys.exit() # If successful, a 200 response code is expected. # If the reponse code is not 200, then read the response body and fail.. if (responseStatusCode != 200): print("Response Code: " + str(responseStatusCode)) print("Response Status Text: " + rest2.responseStatusText()) print("Response Header: " + rest2.responseHeader()) responseBody = rest2.readRespBodyString() if (rest2.get_LastMethodSuccess() == True): print("Error Response Body: " + responseBody) print("Failed.") sys.exit() # For this example, our JSON database will be empty at the beginning. # The incoming events (put and patch) will be applied to this database. jsonDb = chilkat.CkJsonObject() # Make sure to set the JSON path delimiter to "/". The default is "." and this # is not compatible with Firebase paths. jsonDb.put_DelimiterChar("/") # At this point, we've received the response header. Now it's time to begin # receiving the event stream. We'll start a background thread to read the # stream. (Our main application (foreground) thread can cancel it at any time.) # While receiving in the background thread, our foreground thread can read the stream # as it desires.. eventStream = chilkat.CkStream() # This sse object will be used as a helper to parse the server-sent event stream. sse = chilkat.CkServerSentEvent() # task is a CkTask task = rest2.ReadRespBodyStreamAsync(eventStream,True) task.Run() # For this example, we'll just read a few events, and then cancel the # async task. count = 0 while (count < 3) and (task.get_Finished() == False) : # Get the next event, which is a series of text lines ending with # a blank line. # Note: This method blocks the calling thread until a message arrives. # a program might instead periodically check the availability of # data via the stream's DataAvailable property, and then do the read. # An alternative to writing a while loop to read the event stream # would be to setup some sort of timer event in your program (using whatever timer functionality # is provided in a programming language/environment), to periodically check the eventStream's # DataAvailable property and consume the incoming event. eventStr = eventStream.readUntilMatch("\r\n\r\n") if (eventStream.get_LastMethodSuccess() != True): print(eventStream.lastErrorText()) # Force the loop to exit by setting the count to a high number. count = 99999 else: print("Event: [" + eventStr + "]") # We have an event. Let's update our local copy of the JSON database. success = sse.LoadEvent(eventStr) if (success != True): print("Failed to load sse event: " + eventStr) else: # Now we can easily access the event name and data, and apply it to our JSON database: success = jsonDb.FirebaseApplyEvent(sse.eventName(),sse.data()) if (success != True): print("Failed to apply event: " + sse.eventName() + ": " + sse.data()) else: print("Successfully applied event: " + sse.eventName() + ": " + sse.data()) count = count + 1 # Make sure the background task is cancelled if still running. task.Cancel() # Examine the JSON database after applying events.. jsonDb.put_EmitCompact(False) print("----") print(jsonDb.emit()) |
© 2000-2024 Chilkat Software, Inc. All Rights Reserved.